from pyspark.sql.functions import *
from delta.tables import DeltaTable
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql import types as T
from pyspark.sql.functions import col,current_timestamp,from_json,
from_unixtime,lag,lead,lit,mean,stddev,max,regexp_replace
from pyspark.sql.session import SparkSession
from pyspark.sql.streaming import DataStreamWriter
from pyspark.sql.window import Window
dbutils.fs.cp(tempfile,fileprefix+".csv")
dbutils.fs.put(result_path, sms_m, overwrite=True)
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_1.json",
health_tracker + "raw/health_tracker_data_2020_1.json")
dbutils.fs.rm(health_tracker + "processed", recurse=True)
%fs rm -r /dbacademy/DLRS/healthtracker/gold/health_tracker_user_analytics
display(dbutils.fs.ls(rawPath))
print(dbutils.fs.head(suppliersPath))
# To check the file you need(file_2020_1) is in the directory(rawPath)
assert file_2020_1 in [item.name for item in dbutils.fs.ls(rawPath)],
"File not present in Raw Path"
# Mount using the s3a prefix
dbutils.fs.mount(bucketURL, MOUNTPOINT)
# Unmount directory if previously mounted
if MOUNTPOINT in [mnt.mountPoint for mnt in dbutils.fs.mounts()]:
dbutils.fs.unmount("/mnt/s3demo")
df = spark.read.load("examples/src/main/resources/users.json", format="json")
df = spark.read.load("examples/src/main/resources/users.csv", format="csv", sep=";", inferSchema="true", header="true")
df = spark.read.load("examples/src/main/resources/users.parquet")
# pathGlobFilter is used to only include files with file names matching the pattern
df = spark.read.load("examples/src/main/resources/dir1", format="parquet", pathGlobFilter="*.parquet")
# recursiveFileLookup is used to recursively load files and it disables partition inferring. Its default value is false.
# If data source explicitly specifies the partitionSpec when recursiveFileLookup is true, exception will be thrown.
# To load all files recursively, you can use:
df = spark.read.format("parquet").option("recursiveFileLookup", "true").load("examples/src/main/resources/dir1")
df = spark.read.format("delta").option("versionAsOf", 0).load("examples/src/main/resources/dir1")
df = spark.read.format("xml").option("rootTag", "orders").option("rowTag", "purchase_item").option("inferSchema", "true").load(purchaseOrdersPath)
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
df = spark.read.table("health_tracker_processed")
df = spark.read.option("mergeSchema", "true").parquet("examples/src/main/resources/users.parquet")
df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "nan").csv("examples/src/main/resources/users.csv")
df = spark.read.option("inferSchema", "true")json("examples/src/main/resources/users.json")
df_stream = spark.readStream.format("text").option("maxFilesPerTrigger", 1).load(rawPath)
df_stream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)\
.withColumn("INPUT_FILE_NAME", input_file_name()) \ #maintain file path
.withColumn("PROCESSED_TIME", current_timestamp()) \#add a processing timestamp at the time of processing
.withWatermark("PROCESSED_TIME", "1 minute") #optional: window for out of order data
%sql CREATE DATABASE IF NOT EXISTS dbacademy USE dbacademy
%sql SET spark.sql.shuffle.partitions=8
--MSCK REPAIR TABLE recovers all the partitions in the directory of a table and updates the Hive metastore
%sql MSCK REPAIR TABLE health_tracker_processed
%sql DESCRIBE DETAIL health_tracker_processed
%sql DESCRIBE HISTORY health_tracker_silver
%sql DESCRIBE EXTENDED health_tracker_silver
%sql CONVERT TO DELTA parquet.`/dbacademy/DLRS/healthtracker/silver` PARTITIONED BY (p_device_id double)
%sql OPTIMIZE sc_raw_intake.raw_wims_events
CACHE TABLE device_data
%sql
DROP TABLE IF EXISTS health_tracker_silver; -- ensures that if we run this again, it won't fail
CREATE TABLE health_tracker_silver
USING PARQUET
PARTITIONED BY (p_device_id) -- column used to partition the data
LOCATION "/dbacademy/DLRS/healthtracker/silver" -- location where the parquet files will be saved
AS (
SELECT name, -- query used to transform the raw data
heartrate,
CAST(FROM_UNIXTIME(time) AS TIMESTAMP) AS time,
CAST(FROM_UNIXTIME(time) AS DATE) AS dte,
device_id AS p_device_id
FROM health_tracker_data_2020_01
)
df.select('xxxx.*')
df.select('operator').distinct()
# dteWindow = Window.partitionBy("p_device_id").orderBy("dte")
df.select(col("heartrate"), col("dte"),lag(col("heartrate")).over(dteWindow).alias("prev_amt"),lead(col("heartrate")).over(dteWindow).alias("next_amt"))
df.select(from_json(col("value"), json_schema_raw).alias("outer_json"))
df.select(lit("Tibco").alias("datasource"), current_timestamp().alias("ingesttime"), "value", current_timestamp().cast("date").alias("p_ingestdate"))
df.select("dte", "time", "device_type", "heartrate", lit(None).alias("name"), "p_device_id")
df.join(productsDF, on="product_name", how="inner")
df.join(df_planned, (df_actual.ITM_ID == df_planned.Item_code) & (df_actual.p_cre8date == df_planned.Date), how='left')
df.createOrReplaceTempView("updates")
sql_query = "select count(*) as {}_count from {} where cast({} as date) = {}".format(stage, stage, ts_col, ts_date)
spark.sql(sql_query).show()
df.selectExpr(['*', 'cast(CRE8_TS as timestamp) as NEW_CRE8_TS','cast(CRE8_TS as date) as CRE8_DT'])
df.withColumn('dummy_var',F.rand())
df.withColumn('p_cre8date', F.col('CRE8_TS').cast("date"))
df.withColumn("banned_supplier", col("supplier").isin(supplierBlacklist))
df.withColumn('avg_weekly_unts',F.avg(F.col('wkly_unts')).over(prod_str_window))
df.withColumn("warehouse_alert", when(col("product_quantity") == 0, "red").otherwise("green"))
df.withColumn(col, F.when((F.lower(col) == un_list[0]) | (F.lower(col) == un_list[1]), 'undetermined').otherwise(F.lower(col)))
df.withColumn("row_num", row_number().over(Window.partitionBy([focus_term, gb_term]).orderBy(desc('year-month'), desc('value'))))
df.withColumn("price", translate(col("price"), "$,", "").cast("double"))
df.where(col("heartrate") < 0)
df.where(col("p_device_id").isin([3,4])
df.filter((col("ShipDateKey") > 20031231) & (col("ShipDateKey") <= 20041231))
df.filter((col("missing_price") == True) | (col("banned_supplier") == True))
df.filter((F.col(focus_term).isNull()==False) | (F.col(gb_term).isNull()==False))
df.count()
df.orderBy(desc("load_timestamp"))
df.orderBy('ApproxWeight', ascending = False)
df.groupBy(list_g).agg(count(date_type).alias('value'))
#because lazy execution, you can cast as int after sum even if it is string type
df.groupby('DTA_AREA_ID').agg(F.sum(F.col('SALS_QTY')).cast('int'))
df.groupBy('id').count().agg(F.max("count").alias("MAX"),F.min("count").alias("MIN"),F.avg("count").alias("AVERAGE"))
processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")
update_match = """
health_tracker.time = upserts.time
AND
health_tracker.p_device_id = upserts.p_device_id
"""
update = { "heartrate" : "upserts.heartrate" }
insert = {
"p_device_id" : "upserts.p_device_id",
"heartrate" : "upserts.heartrate",
"name" : "upserts.name",
"time" : "upserts.time",
"dte" : "upserts.dte"
}
(processedDeltaTable.alias("health_tracker")
.merge(upsertsDF.alias("upserts"), update_match)
.whenMatchedUpdate(set=update)
.whenNotMatchedInsert(values=insert)
.execute())
%sql
MERGE INTO health_tracker_silver -- the MERGE instruction is used to perform the upsert
USING upserts
ON health_tracker_silver.time = upserts.time AND
health_tracker_silver.p_device_id = upserts.p_device_id -- ON is used to describe the MERGE condition
WHEN MATCHED THEN -- WHEN MATCHED describes the update behavior
UPDATE SET
health_tracker_silver.heartrate = upserts.heartrate
WHEN NOT MATCHED THEN -- WHEN NOT MATCHED describes the insert behavior
INSERT (name, heartrate, time, dte, p_device_id)
VALUES (name, heartrate, time, dte, p_device_id)
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
df.write.format("json").save("s3a://...")
df.write.parquet("namesAndFavColors.parquet")
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
# For file-based data source, it is also possible to bucket and sort or partition the output.
# Bucketing and sorting are applicable only to persistent tables
df.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("people_partitioned_bucketed")
df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").save(suppliersTargetDirectory)
def processRow(row):
# for now write them to log files, but this logic can easily be extended to publishing alerts
# to a topic on Kafka or monitoring/paging service such as Ganglia or PagerDuty.
print("ALERT from Sensors: Between {} and {}, device {} reported only {} times".format(row.start, row.end, row.deviceId, row[3]))
df.writeStream.outputMode("complete").foreach(processRow).start()
def retrieve_data(file: str) -> bool:
"""Download file from remote location to driver. Move from driver to DBFS."""
base_url = "https://files.training.databricks.com/static/data/health-tracker/"
url = base_url + file
driverPath = "file:/databricks/driver/" + file
dbfsPath = landingPath + file
urlretrieve(url, file)
dbutils.fs.mv(driverPath , dbfsPath)
return True
def load_delta_table(file: str, delta_table_path: str) -> bool:
"Load a parquet file as a Delta table."
parquet_df = spark.read.format("parquet").load(landingPath + file)
parquet_df.write.format("delta").save(delta_table_path)
return True
def process_file(file_name: str, path: str, table_name: str) -> bool:
"""
1. retrieve file
2. load as delta table
3. register table in the metastore
"""
retrieve_data(file_name)
print(f"Retrieve {file_name}.")
load_delta_table(file_name, path)
print(f"Load {file_name} to {path}")
spark.sql(f"""
DROP TABLE IF EXISTS {table_name}
""")
spark.sql(f"""
CREATE TABLE {table_name}
USING DELTA
LOCATION "{path}"
""")
print(f"Register {table_name} using path: {path}")
Import