Wednesday, October 28, 2020

PySpark - Map values

 Sometimes you want to define the hierarchy of the data source or just want to define the importance of something data. You can use create_map() function

  src_type = {"first_important": 1, "second_important": 2, "third_important": 3}

  src_type = F.create_map([F.lit(x) for x in chain(*src_type.items())])

Then you can get result like this:

+-------------------------------------------------------------------------+
|Map(first_important-> 1, second_important-> 2, third_important-> 3)      |
+----+--------------------------------------------------------------------+
When you use it, you can do something like
df=df.withColumn("is_type",F.when((src_type[F.col("tgt_type")] \
      >src_type[F.col("src_type")]), 'True').otherwise("False"))

Wednesday, October 7, 2020

PySpark - Read Shape file from S3 and Mount S3 as File System


Advantages of Mounting Amazon S3 as a File System

Mounting an Amazon S3 bucket as a file system means that you can use all your existing tools 
and applications to interact with the Amazon S3 bucket to perform read/write operations on 
files and folders. Can EC2 mount Amazon S3? Using this method enables multiple Amazon 
EC2 instances to concurrently mount and access data in Amazon S3, just like a shared file 
system.
Why use an Amazon S3 file system? Any application interacting with the mounted drive 
doesn’t have to worry about transfer protocols, security mechanisms, or Amazon 
S3-specific API calls. In some cases, mounting Amazon S3 as drive on an application 
server can make creating a distributed file store extremely easy.
For example, when creating a photo upload application, you can have it store data on a fixed 
path in a file system and when deploying you can mount an Amazon S3 bucket on that fixed 
path. This way, the application will write all files in the bucket without you having to worry about
 Amazon S3 integration at the application level. Another major advantage is to enable legacy 
applications to scale in the cloud since there are no source code changes required to use an 
Amazon S3 bucket as storage backend: the application can be configured to use a local path 
where the Amazon S3 bucket is mounted. This technique is also very helpful when you want 
to collect logs from various servers in a central location for archiving.
After mounting S3 as local file system, you can use Pandas or others to access file using 
path like: 
Location = geopandas.read_file("/dbfs/mnt/bucket-name/geofactor/data/shapefilename.shp")
But in my case, since we have limitations on mounting S3 as well as permission issues 
(only Spark can ready from S3 bucket), but the file type is shape file which includes .dbf, .prj, 
.shp and .shx type files and we have to read them as a whole. so I zipped the file. So basically, 
we cannot use Spark to read this zip file. 

We have to use boto3 to read the zip file. And the work around it instead of mounting S3 is to 
read the zip file using BytesIO().
buffer = BytesIO(zip_obj.get()["Body"].read())
zipfile = ZipFile(io.BytesIO(buffer.read()))





PySpark - Two ways to use UDF

 Method 1:

put @F.udf('string')  on top of the function


Method 2:

Change python function to an UDF function:

from pyspark.sql.functions import *

from pyspark.sql.types import *

function_udf = udf(function, OutputType())

then use the UDF function in your Spark dataframe.

PySpark - Usage of UDF function

 You can't directly reference a dataframe (or an RDD) from inside a UDF

The DataFrame object is a handle on your driver that spark uses to represent the data and actions that will happen out on the cluster. The code inside your UDF's will run out on the cluster at a time of Spark's choosing. Spark does this by serializing that code, and making copies of any variables included in the closure and sending them out to each worker.

Option 1:

Use the constructs Spark provides in it's API to join/combine the two DataFrames. If one of the data sets is small, you can manually send out the data in a broadcast variable, and then access it from your UDF. Otherwise, you can just create the two dataframes, then use the join operation to combine them.

Option 2:

Or instead, change one of the Spark framework to Pandas framework. like what I did in this function:

Here I need an CSV to hold the lookup value:

# import state_county_lookup data

file_path = "xxx/xxx/"

file_name= "xxxx.csv"

full_file_path = "s3://{}/{}/{}".format(bucket_name, file_path, file_name)

# set 'inferSchema' to 'false' to retain leading zeros of data

df_lookup = spark.read.format('csv').option("delimiter", ",").option("header", "true").option("inferSchema","false").load(full_file_path)

df_lookup = df_lookup.withColumn("county_id", concat(df_lookup.state_id, df_lookup.county_code))


pd_lookup = df_lookup.toPandas()


Here I have a python function to use the lookup Spark framework.

@F.udf('string')

# function to clean api

def get_api(state_code_src, api):

      ..........

   county_id_lookup_collect = df_lookup.select('county_id').filter(F.col('state_code')== state_code_src).collect()

      pd_lookup_data = pd_lookup.loc[pd_lookup['state_code'] == state_code_src]

    ..........


Initially, the function cannot work. And it will generate the error: PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects.

Then, after I change that Spark framework to Pandas framework (red to green line), it worked!