class UpsertSCDSpark(BaseLoader):
def __init__(
self,
primary_keys: Union[str, list, tuple],
path: Optional[str] = None,
dbtable: Optional[str] = None,
tgt_dbtable: Optional[str] = None,
update_timestamp: Optional[str] = None,
src_eff_timestamp: Optional[str] = None,
current_flag: Optional[str] = "current_flag",
soft_delete_flag: Optional[str] = "soft_delete_flag",
delete_timestamp: Optional[str] = "delete_ts",
eff_ts: Optional[str] = "eff_ts",
exp_ts: Optional[str] = "exp_ts",
columns: Optional[list] = None,
format: str = 'delta',
*args,
**kwargs
):
super(UpsertSCDSpark, self).__init__(*args, **kwargs)
self.primary_keys = primary_keys
self.tgt_dbtable = tgt_dbtable
self.update_timestamp = update_timestamp
self.src_eff_timestamp = src_eff_timestamp
self.current_flag = current_flag
self.soft_delete_flag = soft_delete_flag
self.delete_timestamp = delete_timestamp
self.eff_ts = eff_ts
self.exp_ts = exp_ts
self.dbtable = dbtable
self.columns_list = columns
self.path = path
self.format = format
self._loader = ReadSpark
if self.dbtable and self.path:
raise ValueError('One of `dbtable` or `path` must be specified!')
if self.dbtable and self.path:
raise ValueError('No `dbtable` or `path` specified. One must be specified!')
self.database, self.table = self._parse_dbtable(self.dbtable)
self.logger = logging.getLogger(__name__)
self.logger.setLevel(self.log_level)
def _get_df(self):
params = {
'path': self.path,
'dbtable': self.dbtable,
'format': self.format,
'options': self.options,
'spark_session': self._spark_session,
}
read = ReadSpark(**params)
df = read.load()
return df
def _get_cols(self, tbl_a_alias, tbl_b_alias, keys, columns):
exclude_key_list = [x for x in columns if x not in keys][:-1]
a_list = [tbl_a_alias + '.' + col.strip() for col in exclude_key_list]
b_list = [tbl_b_alias + '.' + col.strip() for col in exclude_key_list]
col_conditions = " or ".join(a + "<>" + b for a, b in zip(a_list, b_list))
return col_conditions
def _get_scd_sql_contents(self, tbl_a_alias, tbl_b_alias, keys, tbl_src_alias="src", tbl_tgt_alias="tgt"):
# get columns
mergeKey_cols = ", ".join(tbl_a_alias + '.' + col + ' as mergeKey_' + col for col in keys)
NULL_mergeKey_cols = ", ".join('NULL as mergeKey_' + col for col in keys)
# get inner join conditions
a_list = [tbl_a_alias + '.' + col.strip() for col in keys]
b_list = [tbl_b_alias + '.' + col.strip() for col in keys]
key_join_conditions = " and ".join(a + "=" + b for a, b in zip(a_list, b_list))
# get outer join conditions
src_list = [tbl_src_alias + '.mergeKey_' + col.strip() for col in keys]
tgt_list = [tbl_tgt_alias + '.' + col.strip() for col in keys]
merge_join_conditions = " and ".join(a + "=" + b for a, b in zip(src_list, tgt_list))
return mergeKey_cols, NULL_mergeKey_cols, key_join_conditions, merge_join_conditions
def _merge_tables(self):
mergeKey_cols, NULL_mergeKey_cols, key_join_conditions, merge_join_conditions = self._get_scd_sql_contents("a",
"b",
self.primary_keys)
df_src = self._get_df()
if self.columns_list:
lower_cols = [col.lower() for col in self.columns_list]
else:
lower_cols = [col.lower() for col in df_src.columns]
lower_keys = [col.lower() for col in self.primary_keys]
src_tbl_alias, tgt_tbl_alias = 'src', 'tgt'
inner_col_conditions = self._get_cols("a", "b", lower_keys, lower_cols)
outer_col_conditions = self._get_cols(src_tbl_alias, tgt_tbl_alias, lower_keys, lower_cols)
insert_columns = ",".join(df_src.columns)
insert_values = ",".join([src_tbl_alias + '.' + col for col in df_src.columns])
sql_merge_query = f"""
MERGE INTO {self.tgt_dbtable} AS tgt
USING (select {mergeKey_cols}, a.*
from {self.dbtable} a
UNION ALL
SELECT {NULL_mergeKey_cols}, a.*
FROM {self.dbtable} a JOIN {self.tgt_dbtable} b ON {key_join_conditions}
WHERE b.{self.current_flag} = 'Y' AND {inner_col_conditions}
) AS src ON {merge_join_conditions}
WHEN MATCHED and tgt.{self.current_flag} = 'Y' and {outer_col_conditions} THEN
UPDATE SET tgt.{self.current_flag} = 'N', tgt.{self.update_timestamp} = current_timestamp(), tgt.{self.exp_ts} = {self.src_eff_timestamp}
WHEN NOT MATCHED THEN
INSERT ({insert_columns}, {self.current_flag}, {self.soft_delete_flag}, {self.delete_timestamp}, {self.eff_ts}, {self.exp_ts})
VALUES ({insert_values}, 'Y', 'N',current_timestamp(), {self.src_eff_timestamp}, current_timestamp())
"""
self._spark_session.sql(sql_merge_query)
def load(self, df: Optional[SparkDataFrame] = None) -> SparkDataFrame:
self.logger.debug('{:}.{:} started...'.format(self.__class__.__name__, inspect.stack()[0][3]))
df_src = self._get_df()
if df_src.head():
self._merge_tables()
df = self._spark_session.table(f"{self.tgt_dbtable}")
else:
warnings.warn(
"Upsert skipped! The input dataframe is empty."
)
self.logger.debug('{:}.{:} completed.'.format(self.__class__.__name__, inspect.stack()[0][3]))
return df
Showing posts with label Spark. Show all posts
Showing posts with label Spark. Show all posts
Wednesday, August 31, 2022
Spark - SCD2 Type Update
Thursday, March 18, 2021
Databricks Spark -Cheatsheet Summary
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql import types as T
from pyspark.sql.functions import col,current_timestamp,from_json,
from_unixtime,lag,lead,lit,mean,stddev,max,regexp_replace
from pyspark.sql.session import SparkSession
from pyspark.sql.streaming import DataStreamWriter
from pyspark.sql.window import Window
dbutils.fs.cp(tempfile,fileprefix+".csv")
dbutils.fs.put(result_path, sms_m, overwrite=True)
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_1.json",
health_tracker + "raw/health_tracker_data_2020_1.json")
dbutils.fs.rm(health_tracker + "processed", recurse=True)
%fs rm -r /dbacademy/DLRS/healthtracker/gold/health_tracker_user_analytics
display(dbutils.fs.ls(rawPath))
print(dbutils.fs.head(suppliersPath))
# To check the file you need(file_2020_1) is in the directory(rawPath)assert file_2020_1 in [item.name for item in dbutils.fs.ls(rawPath)],
"File not present in Raw Path"# Mount using the s3a prefixdbutils.fs.mount(bucketURL, MOUNTPOINT)
# Unmount directory if previously mountedif MOUNTPOINT in [mnt.mountPoint for mnt in dbutils.fs.mounts()]:
dbutils.fs.unmount("/mnt/s3demo")
df = spark.read.load("examples/src/main/resources/users.json", format="json")
df = spark.read.load("examples/src/main/resources/users.csv", format="csv", sep=";", inferSchema="true", header="true")
df = spark.read.load("examples/src/main/resources/users.parquet")
# pathGlobFilter is used to only include files with file names matching the patterndf = spark.read.load("examples/src/main/resources/dir1", format="parquet", pathGlobFilter="*.parquet")
# recursiveFileLookup is used to recursively load files and it disables partition inferring. Its default value is false. # If data source explicitly specifies the partitionSpec when recursiveFileLookup is true, exception will be thrown. # To load all files recursively, you can use:df = spark.read.format("parquet").option("recursiveFileLookup", "true").load("examples/src/main/resources/dir1")
df = spark.read.format("delta").option("versionAsOf", 0).load("examples/src/main/resources/dir1")
df = spark.read.format("xml").option("rootTag", "orders").option("rowTag", "purchase_item").option("inferSchema", "true").load(purchaseOrdersPath)
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
df = spark.read.table("health_tracker_processed")
df = spark.read.option("mergeSchema", "true").parquet("examples/src/main/resources/users.parquet")
df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "nan").csv("examples/src/main/resources/users.csv")
df = spark.read.option("inferSchema", "true")json("examples/src/main/resources/users.json")
df_stream = spark.readStream.format("text").option("maxFilesPerTrigger", 1).load(rawPath)
df_stream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)\
.withColumn("INPUT_FILE_NAME", input_file_name()) \ #maintain file path
.withColumn("PROCESSED_TIME", current_timestamp()) \#add a processing timestamp at the time of processing
.withWatermark("PROCESSED_TIME", "1 minute") #optional: window for out of order data
%sql CREATE DATABASE IF NOT EXISTS dbacademy USE dbacademy
%sql SET spark.sql.shuffle.partitions=8
--MSCK REPAIR TABLE recovers all the partitions in the directory of a table and updates the Hive metastore%sql MSCK REPAIR TABLE health_tracker_processed
%sql DESCRIBE DETAIL health_tracker_processed
%sql DESCRIBE HISTORY health_tracker_silver
%sql DESCRIBE EXTENDED health_tracker_silver
%sql CONVERT TO DELTA parquet.`/dbacademy/DLRS/healthtracker/silver` PARTITIONED BY (p_device_id double)
%sql OPTIMIZE sc_raw_intake.raw_wims_events
CACHE TABLE device_data
%sqlDROP TABLE IF EXISTS health_tracker_silver; -- ensures that if we run this again, it won't fail
CREATE TABLE health_tracker_silver
USING PARQUET PARTITIONED BY (p_device_id) -- column used to partition the data
LOCATION "/dbacademy/DLRS/healthtracker/silver" -- location where the parquet files will be saved
AS (
SELECT name, -- query used to transform the raw data
heartrate, CAST(FROM_UNIXTIME(time) AS TIMESTAMP) AS time,
CAST(FROM_UNIXTIME(time) AS DATE) AS dte,
device_id AS p_device_id FROM health_tracker_data_2020_01 )df.select('xxxx.*')
df.select('operator').distinct()
# dteWindow = Window.partitionBy("p_device_id").orderBy("dte")df.select(col("heartrate"), col("dte"),lag(col("heartrate")).over(dteWindow).alias("prev_amt"),lead(col("heartrate")).over(dteWindow).alias("next_amt"))
df.select(from_json(col("value"), json_schema_raw).alias("outer_json"))
df.select(lit("Tibco").alias("datasource"), current_timestamp().alias("ingesttime"), "value", current_timestamp().cast("date").alias("p_ingestdate"))
df.select("dte", "time", "device_type", "heartrate", lit(None).alias("name"), "p_device_id")
df.join(productsDF, on="product_name", how="inner")
df.join(df_planned, (df_actual.ITM_ID == df_planned.Item_code) & (df_actual.p_cre8date == df_planned.Date), how='left')
df.createOrReplaceTempView("updates")
sql_query = "select count(*) as {}_count from {} where cast({} as date) = {}".format(stage, stage, ts_col, ts_date)
spark.sql(sql_query).show()
df.selectExpr(['*', 'cast(CRE8_TS as timestamp) as NEW_CRE8_TS','cast(CRE8_TS as date) as CRE8_DT'])
df.withColumn('dummy_var',F.rand())
df.withColumn('p_cre8date', F.col('CRE8_TS').cast("date"))
df.withColumn("banned_supplier", col("supplier").isin(supplierBlacklist))
df.withColumn('avg_weekly_unts',F.avg(F.col('wkly_unts')).over(prod_str_window))
df.withColumn("warehouse_alert", when(col("product_quantity") == 0, "red").otherwise("green"))
df.withColumn(col, F.when((F.lower(col) == un_list[0]) | (F.lower(col) == un_list[1]), 'undetermined').otherwise(F.lower(col)))
df.withColumn("row_num", row_number().over(Window.partitionBy([focus_term, gb_term]).orderBy(desc('year-month'), desc('value'))))
df.withColumn("price", translate(col("price"), "$,", "").cast("double"))
df.where(col("heartrate") < 0)
df.where(col("p_device_id").isin([3,4])
df.filter((col("ShipDateKey") > 20031231) & (col("ShipDateKey") <= 20041231))
df.filter((col("missing_price") == True) | (col("banned_supplier") == True))
df.filter((F.col(focus_term).isNull()==False) | (F.col(gb_term).isNull()==False))
df.count()
df.orderBy(desc("load_timestamp"))
df.orderBy('ApproxWeight', ascending = False)
df.groupBy(list_g).agg(count(date_type).alias('value'))
#because lazy execution, you can cast as int after sum even if it is string typedf.groupby('DTA_AREA_ID').agg(F.sum(F.col('SALS_QTY')).cast('int'))
df.groupBy('id').count().agg(F.max("count").alias("MAX"),F.min("count").alias("MIN"),F.avg("count").alias("AVERAGE"))
processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")
update_match = """
health_tracker.time = upserts.time AND health_tracker.p_device_id = upserts.p_device_id"""update = { "heartrate" : "upserts.heartrate" }
insert = {
"p_device_id" : "upserts.p_device_id",
"heartrate" : "upserts.heartrate",
"name" : "upserts.name",
"time" : "upserts.time",
"dte" : "upserts.dte"
}
(processedDeltaTable.alias("health_tracker")
.merge(upsertsDF.alias("upserts"), update_match)
.whenMatchedUpdate(set=update)
.whenNotMatchedInsert(values=insert)
.execute())%sqlMERGE INTO health_tracker_silver -- the MERGE instruction is used to perform the upsert
USING upsertsON health_tracker_silver.time = upserts.time AND
health_tracker_silver.p_device_id = upserts.p_device_id -- ON is used to describe the MERGE condition
WHEN MATCHED THEN -- WHEN MATCHED describes the update behavior
UPDATE SEThealth_tracker_silver.heartrate = upserts.heartrate
WHEN NOT MATCHED THEN -- WHEN NOT MATCHED describes the insert behavior
INSERT (name, heartrate, time, dte, p_device_id)
VALUES (name, heartrate, time, dte, p_device_id)
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
df.write.format("json").save("s3a://...")
df.write.parquet("namesAndFavColors.parquet")
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
# For file-based data source, it is also possible to bucket and sort or partition the output. # Bucketing and sorting are applicable only to persistent tablesdf.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("people_partitioned_bucketed")
df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").save(suppliersTargetDirectory)
def processRow(row):
# for now write them to log files, but this logic can easily be extended to publishing alerts # to a topic on Kafka or monitoring/paging service such as Ganglia or PagerDuty.print("ALERT from Sensors: Between {} and {}, device {} reported only {} times".format(row.start, row.end, row.deviceId, row[3]))
df.writeStream.outputMode("complete").foreach(processRow).start()
def retrieve_data(file: str) -> bool:
"""Download file from remote location to driver. Move from driver to DBFS."""base_url = "https://files.training.databricks.com/static/data/health-tracker/"
url = base_url + file
driverPath = "file:/databricks/driver/" + file
dbfsPath = landingPath + file
urlretrieve(url, file)
dbutils.fs.mv(driverPath , dbfsPath)
return True
def load_delta_table(file: str, delta_table_path: str) -> bool:
"Load a parquet file as a Delta table."parquet_df = spark.read.format("parquet").load(landingPath + file)
parquet_df.write.format("delta").save(delta_table_path)
return True
def process_file(file_name: str, path: str, table_name: str) -> bool:
""" 1. retrieve file 2. load as delta table 3. register table in the metastore """retrieve_data(file_name)
print(f"Retrieve {file_name}.")
load_delta_table(file_name, path)
print(f"Load {file_name} to {path}")
spark.sql(f"""
DROP TABLE IF EXISTS {table_name}
""")spark.sql(f"""
CREATE TABLE {table_name}
USING DELTALOCATION "{path}"
""")print(f"Register {table_name} using path: {path}")
Subscribe to:
Posts (Atom)
Import