You can't directly reference a dataframe (or an RDD) from inside a UDF.
The DataFrame object is a handle on your driver that spark uses to represent the data and actions that will happen out on the cluster. The code inside your UDF's will run out on the cluster at a time of Spark's choosing. Spark does this by serializing that code, and making copies of any variables included in the closure and sending them out to each worker.
Option 1:
Use the constructs Spark provides in it's API to join/combine the two DataFrames. If one of the data sets is small, you can manually send out the data in a broadcast variable, and then access it from your UDF. Otherwise, you can just create the two dataframes, then use the join operation to combine them.
Option 2:
Or instead, change one of the Spark framework to Pandas framework. like what I did in this function:
Here I need an CSV to hold the lookup value:
# import state_county_lookup data
file_path = "xxx/xxx/"
file_name= "xxxx.csv"
full_file_path = "s3://{}/{}/{}".format(bucket_name, file_path, file_name)
# set 'inferSchema' to 'false' to retain leading zeros of data
df_lookup = spark.read.format('csv').option("delimiter", ",").option("header", "true").option("inferSchema","false").load(full_file_path)
df_lookup = df_lookup.withColumn("county_id", concat(df_lookup.state_id, df_lookup.county_code))
pd_lookup = df_lookup.toPandas()
Here I have a python function to use the lookup Spark framework.
@F.udf('string')
# function to clean api
def get_api(state_code_src, api):
..........
county_id_lookup_collect = df_lookup.select('county_id').filter(F.col('state_code')== state_code_src).collect()
pd_lookup_data = pd_lookup.loc[pd_lookup['state_code'] == state_code_src]
..........
Initially, the function cannot work. And it will generate the error: PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects.
Then, after I change that Spark framework to Pandas framework (red to green line), it worked!
No comments:
Post a Comment