Showing posts with label Databricks. Show all posts
Showing posts with label Databricks. Show all posts

Thursday, September 21, 2023

PySpark SQL - write out records with time across multiple days

 Example:

From A:

To B:

Logic:

1. Use sequence() to list out the interval dates between the start and end date.

2. Use explode() to create duplicate rows with the interval dates.

3. Use lag() and row_number() to create extra dates columns for the next steps.

4. Use CASE statement to select different dates based on the row_no to indicate the first/last row.

Code:

tbl_explode as (
select StartDateTime
, explode(sequence(to_date(dateadd(day,1,StartDateTime)), to_date(dateadd(day,1,EndDateTime)), interval 1 day)) as interval_date
, EndDateTime
, *
from tbl_1
where datediff(day, StartDateTime,EndDateTime) > 0 or cast(StartDateTime as date) <> cast(EndDateTimeUTC as date)
),
tbl_time_split as (
select CASE WHEN row_no_min = 1 THEN StartDateTime
ELSE to_timestamp(lag_date) END AS new_StartDateTime
, CASE WHEN row_no_max = 1 THEN to_timestamp(EndDateTime)
ELSE to_timestamp(interval_date) END AS new_EndDateTime
, *
from(
select interval_date
, lag(interval_date, 1) OVER(PARTITION BY WO ORDER BY interval_date) as lag_date
, row_number() OVER (PARTITION BY WO, EventDate ORDER BY interval_date ) as row_no_min
, row_number() OVER (PARTITION BY WO, EventDate ORDER BY interval_date desc ) as row_no_max
, *
from tbl_explode
) d
)

Wednesday, June 28, 2023

Databricks - some useful functions

 Function to drop duplicate columns when join different dataframes


def dropDupeDfCols(df):
newcols = []
dupcols = []

for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)

df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))

return df.toDF(*newcols)

Function to remove nested list

def removeNestings(l):
for i in l:
if type(i) == list:
removeNestings(i)
else:
output.append(i)

Function to dynamic add columns with null value

from functools import reduce

def add_col(df, cl):
return df.withColumn(cl, lit('null'))

test_df = reduce(add_col, new_col_list, targetDF)

Tuesday, May 9, 2023

Databricks - DLT some notes



When to use views, materialized views, and streaming tables

To ensure your pipelines are efficient and maintainable, choose the best dataset type when you implement your pipeline queries.

Consider using a view when:

  • You have a large or complex query that you want to break into easier-to-manage queries.

  • You want to validate intermediate results using expectations.

  • You want to reduce storage and compute costs and do not require the materialization of query results. Because tables are materialized, they require additional computation and storage resources.

Consider using a materialized view when:

  • Multiple downstream queries consume the table. Because views are computed on demand, the view is re-computed every time the view is queried.

  • The table is consumed by other pipelines, jobs, or queries. Because views are not materialized, you can only use them in the same pipeline.

  • You want to view the results of a query during development. Because tables are materialized and can be viewed and queried outside of the pipeline, using tables during development can help validate the correctness of computations. After validating, convert queries that do not require materialization into views.

Consider using a streaming table when:

  • A query is defined against a data source that is continuously or incrementally growing.

  • Query results should be computed incrementally.

  • High throughput and low latency is desired for the pipeline.

2

 By default, Delta Live Tables recomputes table results based on input data each time a pipeline is updated, so you need to make sure the deleted record isn’t reloaded from the source data. Setting the pipelines.reset.allowed table property to false prevents refreshes to a table, but does not prevent incremental writes to the tables or prevent new data from flowing into the table.

3

By contrast, the final tables in a pipeline, commonly referred to as gold tables, often require 
complicated aggregations or read from sources that are the targets of an 
APPLY CHANGES INTO operation. Because these operations inherently create updates rather than appends, they are not supported as inputs to streaming tables. These transformations are better suited for materialized views. By mixing streaming tables and materialized views into a single pipeline, you can simplify your pipeline and avoid costly re-ingestion or re-processing of raw data and have the full power of SQL to compute complex aggregations over an efficiently encoded and filtered dataset.



1

By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits flag can be set on the target streaming table to ignore those changes.




1

Limitations

  • Metrics for the target table, such as number of output rows, are not available.

  • SCD type 2 updates will add a history row for every input row, even if no columns have changed.

  • The target of the APPLY CHANGES INTO query or apply_changes function cannot be used as a source for a streaming table. A table that reads from the target of an APPLY CHANGES INTO query or apply_changes function must be a live table.

  • Expectations are not supported in an APPLY CHANGES INTO query or apply_changes() function. To use expectations for the source or target dataset:

    • Add expectations on source data by defining an intermediate table with the required expectations and use this dataset as the source for the target table.

    • Add expectations on target data with a downstream table that reads input data from the target table.


1

In Python, Delta Live Tables determines whether to update a dataset as a materialized view or streaming table based on the defining query. The @table decorator is used to define both materialized views and streaming tables.

To define a materialized view in Python, apply @table to a query that performs a static read against a data source. To define a streaming table, apply @table to a query that performs a streaming read against a data source. Both dataset types have the same syntax.

2

For tables less than 1 TB in size, Databricks recommends letting Delta Live Tables control data organization. Unless you expect your table to grow beyond a terabyte, you should generally not specify partition columns.


3

Configure a streaming table to ignore changes in a source streaming table

Note

  • To use the skipChangeCommits flag, you must select the Preview channel in your pipeline settings.

  • The skipChangeCommits flag works only with spark.readStream using the option() function. You cannot use this flag in a dlt.read_stream() function.


By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits flag can be set on the target streaming table to ignore those changes.



1


Note

In Databricks Runtime 12.1 and above, skipChangeCommits deprecates the previous setting ignoreChanges. In Databricks Runtime 12.0 and lower, ignoreChanges is the only supported option.

The semantics for ignoreChanges differ greatly from skipChangeCommits. With ignoreChanges enabled, rewritten data files in the source table are re-emitted after a data changing operation such as UPDATEMERGE INTODELETE (within partitions), or OVERWRITE. Unchanged rows are often emitted alongside new rows, so downstream consumers must be able to handle duplicates. Deletes are not propagated downstream. ignoreChanges subsumes ignoreDeletes.

skipChangeCommits disregards file changing operations entirely. Data files that are rewritten in the source table due to data changing operation such as UPDATEMERGE INTODELETE, and OVERWRITE are ignored entirely. In order to reflect changes in upstream source tables, you must implement separate logic to propagate these changes.



Tuesday, December 6, 2022

Databricks - understand your databases/tables

To get the databases you have tables in and how many tables in each database:

This can also be achieved by using Unity Catalog:

%sql

SHOW EXTERNAL LOCATIONS


databases = [ db['databaseName'] for db in spark.sql('show databases').collect()]

tbl_count_list = []


for db_rows in databases :

  print(db_rows)

  final_table_list = []

  table_list = [row['database'] + '.' + row['tableName'] for row in spark.sql(f'show tables in {db_rows}').collect()]

  count = 0 

  for table in table_list:

    try:

      if 'your_data_lake_location' in spark.sql(f'describe detail {table}').collect()[0]['location']:

        count += 1

        final_table_list.append((db_rows, table))

    except:

        print('403 forbidden')

        

  print(final_table_list)  

  tbl_count_list.append((db_rows, count))

  print(tbl_count_list)

Wednesday, October 13, 2021

PySpark - check if directory exists

 def path_exists(path):

    # spark is a SparkSession

    sc = spark.sparkContext

    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(

        sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),

        sc._jsc.hadoopConfiguration(),

    )

    return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))

Wednesday, July 21, 2021

Databricks SQL - extract data from nested JSON text

 

%sql

select distinct left(v2.inner_key, locate('_', v2.inner_key) - 1 ) as some_alias

from db_name.tbl_name 

     LATERAL VIEW json_tuple(tbl_name.nested_json_col_name, 'outter_key') v1 

     as outer_key

     LATERAL VIEW json_tuple(v1.outer_key, 'inner_key') v2

     as inner_key


Monday, June 14, 2021

Datadog event - function to capture notebook exception

This function can be used at any "except Exception" like below. e.g. 

try:
## any code logic

except Exception as e:
context_str = dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()
DatabricksNBRuntimeError(context_str)

Functions are:

from datadog import initialize, api

api_key = dbutils.secrets.get(scope="datadog", key="datadog_api_key")
app_key = dbutils.secrets.get(scope="datadog", key="datadog_app_key")

options = {
"api_key": f"{api_key}",
"app_key": f"{app_key}"
}

initialize(**options)
import sys
import requests
import traceback
import json

def datadog_event(notebook_path, job_name, user):
exc_type, exc_value, exc_tb = sys.exc_info()
traceback_error=str(traceback.format_exception(exc_type, exc_value, exc_tb))
error_case_list = [find_between(traceback_error,"Exception","\n\tat"), find_between(traceback_error,"Error","\n\tat"), traceback_error]
error_message = next(error for error in error_case_list if error != '')

error_dict = {"notebook_metadata": {"job_name":job_name, "notebook_path": notebook_path,"user": user}, "error": error_message}
error_message = json.dumps(error_dict)

title = datadog_identify_string
if job_name:
title = datadog_identify_string + ': job( ' + job_name + ' )'

text = error_message

print(title, text)
api.Event.create(title=title, text=text, tags=datadog_tags, priority='Low')
def exit_notebook(job_name, user):

error_case_list = [find_between(str(e),"Exception","\n\tat"), find_between(str(e),"Error","\n\tat"), str(e)[:200]]
error_message = next(error for error in error_case_list if error != '')
output = {"status": "FAIL", "notebook_metadata": {"job_name":job_name, "user":user}, "message": error_message}

dbutils.notebook.exit(json.dumps(output))
class DatabricksNBRuntimeError(Exception):

def __init__(self, context_str, **kwargs):

self.strerror = args
self.args = args

notebook_metadata = json.loads(context_str)

job_name = None
user = None
notebook_path = notebook_metadata['extraContext']['notebook_path']
if 'jobName' in notebook_metadata['tags'].keys():
job_name = notebook_metadata['tags']['jobName']
if 'user' in notebook_metadata['tags'].keys():
user = notebook_metadata['tags']['user']

datadog_event(notebook_path, job_name, user)
exit_notebook(job_name, user)

Thursday, March 18, 2021

Databricks Spark -Cheatsheet Summary

 

Import

dbutils.library.installPyPI("xlsxwriter")
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql import types as T
from pyspark.sql.functions import col,current_timestamp,from_json,
from_unixtime,lag,lead,lit,mean,stddev,max,regexp_replace
from pyspark.sql.session import SparkSession
from pyspark.sql.streaming import DataStreamWriter
from pyspark.sql.window import Window

Files

from delta.tables import *
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")  
#bypass retention duration so that can vacuum
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.vacuum(0)
dbutils.fs.cp(tempfile,fileprefix+".csv")
dbutils.fs.put(result_path, sms_m, overwrite=True)
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_1.json", 
health_tracker + "raw/health_tracker_data_2020_1.json")
dbutils.fs.rm(health_tracker + "processed", recurse=True)
%fs rm -r /dbacademy/DLRS/healthtracker/gold/health_tracker_user_analytics
display(dbutils.fs.ls(rawPath))
print(dbutils.fs.head(suppliersPath))
 
# To check the file you need(file_2020_1) is in the directory(rawPath)
assert file_2020_1 in [item.name for item in dbutils.fs.ls(rawPath)], 
"File not present in Raw Path"
 
# Mount using the s3a prefix
dbutils.fs.mount(bucketURL, MOUNTPOINT)
# Unmount directory if previously mounted
if MOUNTPOINT in [mnt.mountPoint for mnt in dbutils.fs.mounts()]:
  dbutils.fs.unmount("/mnt/s3demo")

Load

1. General

df = spark.read.load("examples/src/main/resources/users.json", format="json")
df = spark.read.load("examples/src/main/resources/users.csv", format="csv", sep=";", inferSchema="true", header="true")
df = spark.read.load("examples/src/main/resources/users.parquet")
# pathGlobFilter is used to only include files with file names matching the pattern
df = spark.read.load("examples/src/main/resources/dir1", format="parquet", pathGlobFilter="*.parquet")
# recursiveFileLookup is used to recursively load files and it disables partition inferring. Its default value is false. 
# If data source explicitly specifies the partitionSpec when recursiveFileLookup is true, exception will be thrown. 
# To load all files recursively, you can use:
df = spark.read.format("parquet").option("recursiveFileLookup", "true").load("examples/src/main/resources/dir1")
df = spark.read.format("delta").option("versionAsOf", 0).load("examples/src/main/resources/dir1")
df = spark.read.format("xml").option("rootTag", "orders").option("rowTag", "purchase_item").option("inferSchema", "true").load(purchaseOrdersPath)
 
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
df = spark.read.table("health_tracker_processed")
 
df = spark.read.option("mergeSchema", "true").parquet("examples/src/main/resources/users.parquet")
df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "nan").csv("examples/src/main/resources/users.csv")
df = spark.read.option("inferSchema", "true")json("examples/src/main/resources/users.json")

2. Streaming

df_stream = spark.readStream.format("text").option("maxFilesPerTrigger", 1).load(rawPath)
df_stream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)\
                            .withColumn("INPUT_FILE_NAME", input_file_name()) \ #maintain file path
                            .withColumn("PROCESSED_TIME", current_timestamp()) \#add a processing timestamp at the time of processing
                            .withWatermark("PROCESSED_TIME", "1 minute") #optional: window for out of order data

3. SQL

%sql CREATE DATABASE IF NOT EXISTS dbacademy USE dbacademy
%sql SET spark.sql.shuffle.partitions=8
--MSCK REPAIR TABLE recovers all the partitions in the directory of a table and updates the Hive metastore
%sql MSCK REPAIR TABLE health_tracker_processed
%sql DESCRIBE DETAIL health_tracker_processed
%sql DESCRIBE HISTORY health_tracker_silver
%sql DESCRIBE EXTENDED health_tracker_silver
%sql CONVERT TO DELTA parquet.`/dbacademy/DLRS/healthtracker/silver` PARTITIONED BY (p_device_id double)
%sql OPTIMIZE sc_raw_intake.raw_wims_events
CACHE TABLE device_data
%sql 
SELECT  dc_id,device_type, temps,TRANSFORM (temps, t -> ((t * 9) div 5) + 32 ) AS `temps_F`
FROM device_data;
%sql
SELECT dc_id, device_type, co2_level, REDUCE(co2_level, 0, (c, acc) -> c + acc, acc ->(acc div size(co2_level))) as average_co2_level
  FROM device_data
  SORT BY average_co2_level DESC;
%sql 
CREATE OR REPLACE TEMPORARY VIEW broken_readings
AS (
  SELECT COUNT(*) as broken_readings_count, dte FROM health_tracker_silver
  WHERE heartrate < 0
  GROUP BY dte
  ORDER BY dte
)
%sql
INSERT INTO health_tracker_silver
SELECT name,
       heartrate,
       CAST(FROM_UNIXTIME(time) AS TIMESTAMP) AS time,
       CAST(FROM_UNIXTIME(time) AS DATE) AS dte,
       device_id as p_device_id
FROM health_tracker_data_2020_02
%sql
DROP TABLE IF EXISTS health_tracker_silver;               -- ensures that if we run this again, it won't fail                                                        
CREATE TABLE health_tracker_silver                        
USING PARQUET                                             
PARTITIONED BY (p_device_id)                              -- column used to partition the data
LOCATION "/dbacademy/DLRS/healthtracker/silver"           -- location where the parquet files will be saved
AS (                                                      
  SELECT name,                                            -- query used to transform the raw data
         heartrate,                                       
         CAST(FROM_UNIXTIME(time) AS TIMESTAMP) AS time,  
         CAST(FROM_UNIXTIME(time) AS DATE) AS dte,        
         device_id AS p_device_id                         
  FROM health_tracker_data_2020_01   
)
spark.sql(
    """
DROP TABLE IF EXISTS xxx.xxxx
"""
)
 
spark.sql(
    f"""
CREATE TABLE xxx.xxx
USING DELTA
LOCATION "{destination_path}"
"""
)
spark.sql(f"CREATE DATABASE IF NOT EXISTS {cleaned_username}")
spark.sql(f"USE {cleaned_username}")
 
df.write.format("delta").mode("overwrite").saveAsTable("deltaReview")

Transform

1. select

df.select('xxxx.*')
df.select('operator').distinct()
# dteWindow = Window.partitionBy("p_device_id").orderBy("dte")
df.select(col("heartrate"), col("dte"),lag(col("heartrate")).over(dteWindow).alias("prev_amt"),lead(col("heartrate")).over(dteWindow).alias("next_amt"))
df.select(from_json(col("value"), json_schema_raw).alias("outer_json"))
df.select(lit("Tibco").alias("datasource"), current_timestamp().alias("ingesttime"), "value", current_timestamp().cast("date").alias("p_ingestdate"))
df.select("dte", "time", "device_type", "heartrate", lit(None).alias("name"), "p_device_id")
 
df.join(productsDF, on="product_name", how="inner")
df.join(df_planned, (df_actual.ITM_ID == df_planned.Item_code) & (df_actual.p_cre8date == df_planned.Date), how='left') 
 
df.createOrReplaceTempView("updates")
sql_query = "select count(*) as {}_count from {} where cast({} as date) = {}".format(stage, stage, ts_col, ts_date) 
spark.sql(sql_query).show()
 
df.selectExpr(['*',  'cast(CRE8_TS as timestamp) as NEW_CRE8_TS','cast(CRE8_TS as date) as CRE8_DT'])

2. filter

df.withColumn('dummy_var',F.rand())
df.withColumn('p_cre8date', F.col('CRE8_TS').cast("date"))
df.withColumn("banned_supplier", col("supplier").isin(supplierBlacklist))
df.withColumn('avg_weekly_unts',F.avg(F.col('wkly_unts')).over(prod_str_window))
df.withColumn("warehouse_alert", when(col("product_quantity") == 0, "red").otherwise("green"))
df.withColumn(col, F.when((F.lower(col) == un_list[0]) | (F.lower(col) == un_list[1]), 'undetermined').otherwise(F.lower(col)))
df.withColumn("row_num", row_number().over(Window.partitionBy([focus_term, gb_term]).orderBy(desc('year-month'), desc('value'))))
df.withColumn("price", translate(col("price"), "$,", "").cast("double"))
 
df.where(col("heartrate") < 0)
df.where(col("p_device_id").isin([3,4])
 
df.filter((col("ShipDateKey") > 20031231) & (col("ShipDateKey") <= 20041231))
df.filter((col("missing_price") == True) | (col("banned_supplier") == True))        
df.filter((F.col(focus_term).isNull()==False) | (F.col(gb_term).isNull()==False))

3. clean data

df.withColumnRenamed("TAX_ID", "tax_id")
df.withColumn('FW', F.coalesce(F.col('FW'), F.lit('unknown')))
 
df.dropDuplicates(["value"])
df.drop("product_quantity")
df.dropna(subset=(focus_term, gb_term))

4. aggregations

df.count()
df.orderBy(desc("load_timestamp"))
df.orderBy('ApproxWeight', ascending = False)
df.groupBy(list_g).agg(count(date_type).alias('value'))
#because lazy execution, you can cast as int after sum even if it is string type
df.groupby('DTA_AREA_ID').agg(F.sum(F.col('SALS_QTY')).cast('int'))
df.groupBy('id').count().agg(F.max("count").alias("MAX"),F.min("count").alias("MIN"),F.avg("count").alias("AVERAGE"))

5. upsert

processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")
 
update_match = """
  health_tracker.time = upserts.time 
  AND 
  health_tracker.p_device_id = upserts.p_device_id
"""
 
update = { "heartrate" : "upserts.heartrate" }
 
insert = {
  "p_device_id" : "upserts.p_device_id",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "time" : "upserts.time",
  "dte" : "upserts.dte"
}
 
(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())
%sql
MERGE INTO health_tracker_silver                            -- the MERGE instruction is used to perform the upsert
USING upserts
 
ON health_tracker_silver.time = upserts.time AND        
   health_tracker_silver.p_device_id = upserts.p_device_id  -- ON is used to describe the MERGE condition
   
WHEN MATCHED THEN                                           -- WHEN MATCHED describes the update behavior
  UPDATE SET
  health_tracker_silver.heartrate = upserts.heartrate   
WHEN NOT MATCHED THEN                                       -- WHEN NOT MATCHED describes the insert behavior
  INSERT (name, heartrate, time, dte, p_device_id)              
  VALUES (name, heartrate, time, dte, p_device_id)

6. Delta Table

display(processedDeltaTable.history())
processedDeltaTable.delete("p_device_id = 4")

Write

1. General

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
df.write.format("json").save("s3a://...")
df.write.parquet("namesAndFavColors.parquet")
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
# For file-based data source, it is also possible to bucket and sort or partition the output. 
# Bucketing and sorting are applicable only to persistent tables
df.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("people_partitioned_bucketed")
df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").save(suppliersTargetDirectory)

2. Streaming

df.writeStream.format("delta").outputMode("append").partitionBy("p_cre8date").queryName("dax_shipment_stream")\
  .trigger(processingTime="30 seconds").option("checkpointLocation", checkpoint_path).start(destination_path)       
def processRow(row):
  # for now write them to log files, but this logic can easily be extended to publishing alerts
  # to a topic on Kafka or monitoring/paging service such as Ganglia or PagerDuty.
  print("ALERT from Sensors: Between {} and {}, device {} reported only {} times".format(row.start, row.end, row.deviceId, row[3]))
  
df.writeStream.outputMode("complete").foreach(processRow).start()
 def stop_all_streams() -> bool:
    stopped = False
    for stream in spark.streams.active:
        stopped = True
        stream.stop()
    return stopped
  
#stop all streams
streams_stopped = stop_all_streams()
 
if streams_stopped:
    print("All streams stopped.")
else:
    print("No running streams.")
sqm = spark.streams
[q.name for q in sqm.active]
[q.stop() for q in sqm.active]

Example

def retrieve_data(file: str) -> bool:
  """Download file from remote location to driver. Move from driver to DBFS."""
 
  base_url = "https://files.training.databricks.com/static/data/health-tracker/"
  url = base_url + file
  driverPath = "file:/databricks/driver/" + file
  dbfsPath   = landingPath + file
  urlretrieve(url, file)
  dbutils.fs.mv(driverPath , dbfsPath)
  return True
 
def load_delta_table(file: str, delta_table_path: str) -> bool:
  "Load a parquet file as a Delta table."
  parquet_df = spark.read.format("parquet").load(landingPath + file)
  parquet_df.write.format("delta").save(delta_table_path)
  return True
 
def process_file(file_name: str, path: str,  table_name: str) -> bool:
  """
  1. retrieve file
  2. load as delta table
  3. register table in the metastore
  """
 
  retrieve_data(file_name)
  print(f"Retrieve {file_name}.")
 
  load_delta_table(file_name, path)
  print(f"Load {file_name} to {path}")
 
  spark.sql(f"""
  DROP TABLE IF EXISTS {table_name}
  """)
 
  spark.sql(f"""
  CREATE TABLE {table_name}
  USING DELTA
  LOCATION "{path}"
  """)
 
  print(f"Register {table_name} using path: {path}")