Wednesday, October 7, 2020

PySpark - Usage of UDF function

 You can't directly reference a dataframe (or an RDD) from inside a UDF

The DataFrame object is a handle on your driver that spark uses to represent the data and actions that will happen out on the cluster. The code inside your UDF's will run out on the cluster at a time of Spark's choosing. Spark does this by serializing that code, and making copies of any variables included in the closure and sending them out to each worker.

Option 1:

Use the constructs Spark provides in it's API to join/combine the two DataFrames. If one of the data sets is small, you can manually send out the data in a broadcast variable, and then access it from your UDF. Otherwise, you can just create the two dataframes, then use the join operation to combine them.

Option 2:

Or instead, change one of the Spark framework to Pandas framework. like what I did in this function:

Here I need an CSV to hold the lookup value:

# import state_county_lookup data

file_path = "xxx/xxx/"

file_name= "xxxx.csv"

full_file_path = "s3://{}/{}/{}".format(bucket_name, file_path, file_name)

# set 'inferSchema' to 'false' to retain leading zeros of data

df_lookup = spark.read.format('csv').option("delimiter", ",").option("header", "true").option("inferSchema","false").load(full_file_path)

df_lookup = df_lookup.withColumn("county_id", concat(df_lookup.state_id, df_lookup.county_code))


pd_lookup = df_lookup.toPandas()


Here I have a python function to use the lookup Spark framework.

@F.udf('string')

# function to clean api

def get_api(state_code_src, api):

      ..........

   county_id_lookup_collect = df_lookup.select('county_id').filter(F.col('state_code')== state_code_src).collect()

      pd_lookup_data = pd_lookup.loc[pd_lookup['state_code'] == state_code_src]

    ..........


Initially, the function cannot work. And it will generate the error: PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects.

Then, after I change that Spark framework to Pandas framework (red to green line), it worked!

Friday, September 11, 2020

Python - Split one CSV file into multiple ones

 

import csv
import os


def split(filehandler, delimiter=',', row_limit=500000,
output_name_template='output_%s.csv', output_path='.', keep_headers=True):
reader = csv.reader(filehandler, delimiter=delimiter)
current_piece = 1
current_out_path = os.path.join(
output_path,
output_name_template % current_piece
)
current_out_writer = csv.writer(open(current_out_path, 'w', newline=''), delimiter=delimiter)
current_limit = row_limit
if keep_headers:
headers = next(reader)
current_out_writer.writerow(headers)
for i, row in enumerate(reader):
if i + 1 > current_limit:
current_piece += 1
current_limit = row_limit * current_piece
current_out_path = os.path.join(
output_path,
output_name_template % current_piece
)
current_out_writer = csv.writer(open(current_out_path, 'w', newline=''), delimiter=delimiter)
if keep_headers:
current_out_writer.writerow(headers)
current_out_writer.writerow(row)


file_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "your_file_name.csv")
split(open(file_path, 'r'))

Thursday, September 10, 2020

New era

 Recenly, I've been trying to learning PySpark and related. like AWS EMR. AWS Glue, AWS Quicksight, SageMaker, Databricks, Athena. It is a new way of processing data(well, compare to what I've been doing), kind of more cloud processing. Very excited, and continue learning.....will get more posts later :) 


Wednesday, July 15, 2020

Python / S3 - Functions to list keys in an S3 bucket using Python


from glob import glob
import boto3


class Versions:
    def __init__(self):
        """Gets the latest version from local or s3"""        
         pass
    def get_latest_version_from_local(path):
        """Gets the latest version from Local"""        
        versions_paths = glob((path + "/*"), recursive=True)
        versions = []
        for i in enumerate(versions_paths):
            split_path = i[1].rstrip('/').split("/")
            version = split_path.pop()
            versions.append(version)
        versions.sort(reverse=True)
        return versions[0]
    def get_latest_version_from_s3(bucket_name, path):
        """Gets the latest version from s3 """       
        key = path.rstrip('/').split("/").pop()
        s3 = boto3.client('s3')
        response = s3.list_objects_v2(
            Bucket=bucket_name,            
            Prefix=key,            
            MaxKeys=100)
        versions = []
        for obj in response['Contents']:
            split_path = obj['Key'].rstrip('/').split("/")
            versions.append(split_path[1])
        versions.sort(reverse=True)
        return versions[0]
 
    def get_all_s3_keys(bucket_name):
        """Get a list of all keys in an S3 bucket."""        
        versions = []

        kwargs = {'Bucket': bucket_name}
        s3 = boto3.client('s3')
        while True:
            resp = s3.list_objects_v2(**kwargs)
            for obj in resp['Contents']:
                if 'well_production' in str(obj['Key']) 
                    and '$folder$' not in str(obj['Key']):
                    versions.append(obj['Key'])
            try:
                kwargs['ContinuationToken'] = resp['NextContinuationToken']
            except KeyError:
                break        
        versions.sort(reverse=True)
        return versions[0]

Thursday, July 9, 2020

SQL - table-valued function to get all months between two date range


CREATE FUNCTION [dbo].[GetMonths](@StartDate DATETIME, @EndDate DATETIME)

RETURNS @MonthList TABLE(MonthValue VARCHAR(15) NOT NULL)

AS

BEGIN

    --Variable used to hold each new date value

    DECLARE @DateValue DATETIME

    --Start with the starting date in the range

    SET @DateValue=@StartDate

    --Load output table with the month part of each new date

    WHILE @DateValue <= @EndDate

    BEGIN

        INSERT INTO @MonthList(MonthValue)

        SELECT cast(@DateValue as date)

        --Move to the next month

        SET @DateValue=DATEADD(mm,1,@DateValue)

    END

    RETURN 

END

Monday, July 6, 2020

SQL - Track where a Stored Procedure is being used


 SELECT o.name
 FROM syscomments AS c
 INNER JOIN sysobjects AS o
 ON c.id = o.id
 WHERE c.text LIKE '%stored_procedure_name%';

Wednesday, July 1, 2020

Regular Expression - good practice and a good place to check


A good place to check if you regular expression does the work is https://regex101.com/

e.g. 




I have a list of company list extracted from some website's filter:

By using regular expression, I can easily extract the company number in two lines:


regex = re.compile(r'.*\((\d{5})\)')
company_value_list = [regex.match(item.text).group(1) for item in company_list
                      if re.match(regex, item.text) is not None]


So the result is a list of company numbers:

['39227', '65860', '39639', '68942', '68979', '68998', '68938', '62950'.....]