Thursday, March 18, 2021

Databricks Spark -Cheatsheet Summary

 

Import

dbutils.library.installPyPI("xlsxwriter")
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql import types as T
from pyspark.sql.functions import col,current_timestamp,from_json,
from_unixtime,lag,lead,lit,mean,stddev,max,regexp_replace
from pyspark.sql.session import SparkSession
from pyspark.sql.streaming import DataStreamWriter
from pyspark.sql.window import Window

Files

from delta.tables import *
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")  
#bypass retention duration so that can vacuum
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.vacuum(0)
dbutils.fs.cp(tempfile,fileprefix+".csv")
dbutils.fs.put(result_path, sms_m, overwrite=True)
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_1.json", 
health_tracker + "raw/health_tracker_data_2020_1.json")
dbutils.fs.rm(health_tracker + "processed", recurse=True)
%fs rm -r /dbacademy/DLRS/healthtracker/gold/health_tracker_user_analytics
display(dbutils.fs.ls(rawPath))
print(dbutils.fs.head(suppliersPath))
 
# To check the file you need(file_2020_1) is in the directory(rawPath)
assert file_2020_1 in [item.name for item in dbutils.fs.ls(rawPath)], 
"File not present in Raw Path"
 
# Mount using the s3a prefix
dbutils.fs.mount(bucketURL, MOUNTPOINT)
# Unmount directory if previously mounted
if MOUNTPOINT in [mnt.mountPoint for mnt in dbutils.fs.mounts()]:
  dbutils.fs.unmount("/mnt/s3demo")

Load

1. General

df = spark.read.load("examples/src/main/resources/users.json", format="json")
df = spark.read.load("examples/src/main/resources/users.csv", format="csv", sep=";", inferSchema="true", header="true")
df = spark.read.load("examples/src/main/resources/users.parquet")
# pathGlobFilter is used to only include files with file names matching the pattern
df = spark.read.load("examples/src/main/resources/dir1", format="parquet", pathGlobFilter="*.parquet")
# recursiveFileLookup is used to recursively load files and it disables partition inferring. Its default value is false. 
# If data source explicitly specifies the partitionSpec when recursiveFileLookup is true, exception will be thrown. 
# To load all files recursively, you can use:
df = spark.read.format("parquet").option("recursiveFileLookup", "true").load("examples/src/main/resources/dir1")
df = spark.read.format("delta").option("versionAsOf", 0).load("examples/src/main/resources/dir1")
df = spark.read.format("xml").option("rootTag", "orders").option("rowTag", "purchase_item").option("inferSchema", "true").load(purchaseOrdersPath)
 
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
df = spark.read.table("health_tracker_processed")
 
df = spark.read.option("mergeSchema", "true").parquet("examples/src/main/resources/users.parquet")
df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "nan").csv("examples/src/main/resources/users.csv")
df = spark.read.option("inferSchema", "true")json("examples/src/main/resources/users.json")

2. Streaming

df_stream = spark.readStream.format("text").option("maxFilesPerTrigger", 1).load(rawPath)
df_stream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)\
                            .withColumn("INPUT_FILE_NAME", input_file_name()) \ #maintain file path
                            .withColumn("PROCESSED_TIME", current_timestamp()) \#add a processing timestamp at the time of processing
                            .withWatermark("PROCESSED_TIME", "1 minute") #optional: window for out of order data

3. SQL

%sql CREATE DATABASE IF NOT EXISTS dbacademy USE dbacademy
%sql SET spark.sql.shuffle.partitions=8
--MSCK REPAIR TABLE recovers all the partitions in the directory of a table and updates the Hive metastore
%sql MSCK REPAIR TABLE health_tracker_processed
%sql DESCRIBE DETAIL health_tracker_processed
%sql DESCRIBE HISTORY health_tracker_silver
%sql DESCRIBE EXTENDED health_tracker_silver
%sql CONVERT TO DELTA parquet.`/dbacademy/DLRS/healthtracker/silver` PARTITIONED BY (p_device_id double)
%sql OPTIMIZE sc_raw_intake.raw_wims_events
CACHE TABLE device_data
%sql 
SELECT  dc_id,device_type, temps,TRANSFORM (temps, t -> ((t * 9) div 5) + 32 ) AS `temps_F`
FROM device_data;
%sql
SELECT dc_id, device_type, co2_level, REDUCE(co2_level, 0, (c, acc) -> c + acc, acc ->(acc div size(co2_level))) as average_co2_level
  FROM device_data
  SORT BY average_co2_level DESC;
%sql 
CREATE OR REPLACE TEMPORARY VIEW broken_readings
AS (
  SELECT COUNT(*) as broken_readings_count, dte FROM health_tracker_silver
  WHERE heartrate < 0
  GROUP BY dte
  ORDER BY dte
)
%sql
INSERT INTO health_tracker_silver
SELECT name,
       heartrate,
       CAST(FROM_UNIXTIME(time) AS TIMESTAMP) AS time,
       CAST(FROM_UNIXTIME(time) AS DATE) AS dte,
       device_id as p_device_id
FROM health_tracker_data_2020_02
%sql
DROP TABLE IF EXISTS health_tracker_silver;               -- ensures that if we run this again, it won't fail                                                        
CREATE TABLE health_tracker_silver                        
USING PARQUET                                             
PARTITIONED BY (p_device_id)                              -- column used to partition the data
LOCATION "/dbacademy/DLRS/healthtracker/silver"           -- location where the parquet files will be saved
AS (                                                      
  SELECT name,                                            -- query used to transform the raw data
         heartrate,                                       
         CAST(FROM_UNIXTIME(time) AS TIMESTAMP) AS time,  
         CAST(FROM_UNIXTIME(time) AS DATE) AS dte,        
         device_id AS p_device_id                         
  FROM health_tracker_data_2020_01   
)
spark.sql(
    """
DROP TABLE IF EXISTS xxx.xxxx
"""
)
 
spark.sql(
    f"""
CREATE TABLE xxx.xxx
USING DELTA
LOCATION "{destination_path}"
"""
)
spark.sql(f"CREATE DATABASE IF NOT EXISTS {cleaned_username}")
spark.sql(f"USE {cleaned_username}")
 
df.write.format("delta").mode("overwrite").saveAsTable("deltaReview")

Transform

1. select

df.select('xxxx.*')
df.select('operator').distinct()
# dteWindow = Window.partitionBy("p_device_id").orderBy("dte")
df.select(col("heartrate"), col("dte"),lag(col("heartrate")).over(dteWindow).alias("prev_amt"),lead(col("heartrate")).over(dteWindow).alias("next_amt"))
df.select(from_json(col("value"), json_schema_raw).alias("outer_json"))
df.select(lit("Tibco").alias("datasource"), current_timestamp().alias("ingesttime"), "value", current_timestamp().cast("date").alias("p_ingestdate"))
df.select("dte", "time", "device_type", "heartrate", lit(None).alias("name"), "p_device_id")
 
df.join(productsDF, on="product_name", how="inner")
df.join(df_planned, (df_actual.ITM_ID == df_planned.Item_code) & (df_actual.p_cre8date == df_planned.Date), how='left') 
 
df.createOrReplaceTempView("updates")
sql_query = "select count(*) as {}_count from {} where cast({} as date) = {}".format(stage, stage, ts_col, ts_date) 
spark.sql(sql_query).show()
 
df.selectExpr(['*',  'cast(CRE8_TS as timestamp) as NEW_CRE8_TS','cast(CRE8_TS as date) as CRE8_DT'])

2. filter

df.withColumn('dummy_var',F.rand())
df.withColumn('p_cre8date', F.col('CRE8_TS').cast("date"))
df.withColumn("banned_supplier", col("supplier").isin(supplierBlacklist))
df.withColumn('avg_weekly_unts',F.avg(F.col('wkly_unts')).over(prod_str_window))
df.withColumn("warehouse_alert", when(col("product_quantity") == 0, "red").otherwise("green"))
df.withColumn(col, F.when((F.lower(col) == un_list[0]) | (F.lower(col) == un_list[1]), 'undetermined').otherwise(F.lower(col)))
df.withColumn("row_num", row_number().over(Window.partitionBy([focus_term, gb_term]).orderBy(desc('year-month'), desc('value'))))
df.withColumn("price", translate(col("price"), "$,", "").cast("double"))
 
df.where(col("heartrate") < 0)
df.where(col("p_device_id").isin([3,4])
 
df.filter((col("ShipDateKey") > 20031231) & (col("ShipDateKey") <= 20041231))
df.filter((col("missing_price") == True) | (col("banned_supplier") == True))        
df.filter((F.col(focus_term).isNull()==False) | (F.col(gb_term).isNull()==False))

3. clean data

df.withColumnRenamed("TAX_ID", "tax_id")
df.withColumn('FW', F.coalesce(F.col('FW'), F.lit('unknown')))
 
df.dropDuplicates(["value"])
df.drop("product_quantity")
df.dropna(subset=(focus_term, gb_term))

4. aggregations

df.count()
df.orderBy(desc("load_timestamp"))
df.orderBy('ApproxWeight', ascending = False)
df.groupBy(list_g).agg(count(date_type).alias('value'))
#because lazy execution, you can cast as int after sum even if it is string type
df.groupby('DTA_AREA_ID').agg(F.sum(F.col('SALS_QTY')).cast('int'))
df.groupBy('id').count().agg(F.max("count").alias("MAX"),F.min("count").alias("MIN"),F.avg("count").alias("AVERAGE"))

5. upsert

processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")
 
update_match = """
  health_tracker.time = upserts.time 
  AND 
  health_tracker.p_device_id = upserts.p_device_id
"""
 
update = { "heartrate" : "upserts.heartrate" }
 
insert = {
  "p_device_id" : "upserts.p_device_id",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "time" : "upserts.time",
  "dte" : "upserts.dte"
}
 
(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())
%sql
MERGE INTO health_tracker_silver                            -- the MERGE instruction is used to perform the upsert
USING upserts
 
ON health_tracker_silver.time = upserts.time AND        
   health_tracker_silver.p_device_id = upserts.p_device_id  -- ON is used to describe the MERGE condition
   
WHEN MATCHED THEN                                           -- WHEN MATCHED describes the update behavior
  UPDATE SET
  health_tracker_silver.heartrate = upserts.heartrate   
WHEN NOT MATCHED THEN                                       -- WHEN NOT MATCHED describes the insert behavior
  INSERT (name, heartrate, time, dte, p_device_id)              
  VALUES (name, heartrate, time, dte, p_device_id)

6. Delta Table

display(processedDeltaTable.history())
processedDeltaTable.delete("p_device_id = 4")

Write

1. General

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
df.write.format("json").save("s3a://...")
df.write.parquet("namesAndFavColors.parquet")
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
# For file-based data source, it is also possible to bucket and sort or partition the output. 
# Bucketing and sorting are applicable only to persistent tables
df.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("people_partitioned_bucketed")
df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").save(suppliersTargetDirectory)

2. Streaming

df.writeStream.format("delta").outputMode("append").partitionBy("p_cre8date").queryName("dax_shipment_stream")\
  .trigger(processingTime="30 seconds").option("checkpointLocation", checkpoint_path).start(destination_path)       
def processRow(row):
  # for now write them to log files, but this logic can easily be extended to publishing alerts
  # to a topic on Kafka or monitoring/paging service such as Ganglia or PagerDuty.
  print("ALERT from Sensors: Between {} and {}, device {} reported only {} times".format(row.start, row.end, row.deviceId, row[3]))
  
df.writeStream.outputMode("complete").foreach(processRow).start()
 def stop_all_streams() -> bool:
    stopped = False
    for stream in spark.streams.active:
        stopped = True
        stream.stop()
    return stopped
  
#stop all streams
streams_stopped = stop_all_streams()
 
if streams_stopped:
    print("All streams stopped.")
else:
    print("No running streams.")
sqm = spark.streams
[q.name for q in sqm.active]
[q.stop() for q in sqm.active]

Example

def retrieve_data(file: str) -> bool:
  """Download file from remote location to driver. Move from driver to DBFS."""
 
  base_url = "https://files.training.databricks.com/static/data/health-tracker/"
  url = base_url + file
  driverPath = "file:/databricks/driver/" + file
  dbfsPath   = landingPath + file
  urlretrieve(url, file)
  dbutils.fs.mv(driverPath , dbfsPath)
  return True
 
def load_delta_table(file: str, delta_table_path: str) -> bool:
  "Load a parquet file as a Delta table."
  parquet_df = spark.read.format("parquet").load(landingPath + file)
  parquet_df.write.format("delta").save(delta_table_path)
  return True
 
def process_file(file_name: str, path: str,  table_name: str) -> bool:
  """
  1. retrieve file
  2. load as delta table
  3. register table in the metastore
  """
 
  retrieve_data(file_name)
  print(f"Retrieve {file_name}.")
 
  load_delta_table(file_name, path)
  print(f"Load {file_name} to {path}")
 
  spark.sql(f"""
  DROP TABLE IF EXISTS {table_name}
  """)
 
  spark.sql(f"""
  CREATE TABLE {table_name}
  USING DELTA
  LOCATION "{path}"
  """)
 
  print(f"Register {table_name} using path: {path}")