Implicit Schema For Pandas_udf In Pyspark?
This answer nicely explains how to use pyspark's groupby and pandas_udf to do custom aggregations. However, I cannot possibly declare my schema manually as shown in this part of th
Solution 1:
Based on Sanxofons comment, I got an idea on how to implement this myself:
from pyspark.sql.types import *
mapping = {"float64": DoubleType,
"object":StringType,
"int64":IntegerType} # Incomplete - extend with your types.defcreateUDFSchemaFromPandas(dfp):
column_types = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
schema = StructType(column_types)
return schema
What I do is get a sample pandas df, pass it to the function, and see what returns:
dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)
This seems to work for me. The problem is that it is kind of recursive (need to define the function to get the schema, have the schema to define as udf). I solved this by creating a "wrapper" UDF that simply passes the dataframe.
Solution 2:
Slightly modifying @Thomas answer I did the following. Since df.types
returns a list of tuples (at least in the latest pandas version) and not a dictionary, I replaced str(dfp.dtypes[key])
with dict(df.dtypes)[key]
defudf_schema_from_pandas(df):
column_types = [StructField(key, mapping[dict(df.dtypes)[key]]()) for key in df.columns]
schema = StructType(column_types)
return schema
Post a Comment for "Implicit Schema For Pandas_udf In Pyspark?"