Skip to content Skip to sidebar Skip to footer

Implicit Schema For Pandas_udf In Pyspark?

This answer nicely explains how to use pyspark's groupby and pandas_udf to do custom aggregations. However, I cannot possibly declare my schema manually as shown in this part of th

Solution 1:

Based on Sanxofons comment, I got an idea on how to implement this myself:

from pyspark.sql.types import *

mapping = {"float64": DoubleType,
           "object":StringType,
           "int64":IntegerType} # Incomplete - extend with your types.defcreateUDFSchemaFromPandas(dfp):
  column_types  = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
  schema = StructType(column_types)
  return schema

What I do is get a sample pandas df, pass it to the function, and see what returns:

dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)

This seems to work for me. The problem is that it is kind of recursive (need to define the function to get the schema, have the schema to define as udf). I solved this by creating a "wrapper" UDF that simply passes the dataframe.

Solution 2:

Slightly modifying @Thomas answer I did the following. Since df.types returns a list of tuples (at least in the latest pandas version) and not a dictionary, I replaced str(dfp.dtypes[key]) with dict(df.dtypes)[key]

defudf_schema_from_pandas(df):
  column_types  = [StructField(key, mapping[dict(df.dtypes)[key]]()) for key in df.columns]
  schema = StructType(column_types)
  return schema

Post a Comment for "Implicit Schema For Pandas_udf In Pyspark?"