import boto3
Nan's Blog
Tuesday, October 17, 2023
Python - get folder size from s3 bucket
Wednesday, September 27, 2023
Python - Finding Correlation Between Many Variables (Multidimensional Dataset) with Python
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
data = pd.read_csv('/Users/Downloads/test.csv', index_col=0)
corr = data.corr()
fig = plt.figure()
ax = fig.add_subplot(111)
cax = ax.matshow(corr, cmap='coolwarm', vmin=-1, vmax=1)
fig.colorbar(cax)
ticks = np.arange(0, len(data.columns), 1)
ax.set_xticks(ticks)
plt.xticks(rotation=90)
ax.set_yticks(ticks)
ax.set_xticklabels(data.columns)
ax.set_yticklabels(data.columns)
plt.show()
Thursday, September 21, 2023
PySpark SQL - write out records with time across multiple days
Example:
From A:
Logic:
1. Use sequence() to list out the interval dates between the start and end date.
2. Use explode() to create duplicate rows with the interval dates.
3. Use lag() and row_number() to create extra dates columns for the next steps.
4. Use CASE statement to select different dates based on the row_no to indicate the first/last row.
Code:
Wednesday, June 28, 2023
Azure - functions to get schema from schema registry using API
import requests
Databricks - some useful functions
Function to drop duplicate columns when join different dataframes
Function to remove nested list
Function to dynamic add columns with null value
Tuesday, May 9, 2023
Databricks - DLT some notes
1
When to use views, materialized views, and streaming tables
To ensure your pipelines are efficient and maintainable, choose the best dataset type when you implement your pipeline queries.
Consider using a view when:
You have a large or complex query that you want to break into easier-to-manage queries.
You want to validate intermediate results using expectations.
You want to reduce storage and compute costs and do not require the materialization of query results. Because tables are materialized, they require additional computation and storage resources.
Consider using a materialized view when:
Multiple downstream queries consume the table. Because views are computed on demand, the view is re-computed every time the view is queried.
The table is consumed by other pipelines, jobs, or queries. Because views are not materialized, you can only use them in the same pipeline.
You want to view the results of a query during development. Because tables are materialized and can be viewed and queried outside of the pipeline, using tables during development can help validate the correctness of computations. After validating, convert queries that do not require materialization into views.
Consider using a streaming table when:
A query is defined against a data source that is continuously or incrementally growing.
Query results should be computed incrementally.
High throughput and low latency is desired for the pipeline.
By default, Delta Live Tables recomputes table results based on input data each time a pipeline is updated, so you need to make sure the deleted record isn’t reloaded from the source data. Setting the pipelines.reset.allowed
table property to false
prevents refreshes to a table, but does not prevent incremental writes to the tables or prevent new data from flowing into the table.
3
By contrast, the final tables in a pipeline, commonly referred to as gold tables, often require
complicated aggregations or read from sources that are the targets of an APPLY CHANGES INTO
operation. Because these operations inherently create updates rather than appends, they are not supported as inputs to streaming tables. These transformations are better suited for materialized views. By mixing streaming tables and materialized views into a single pipeline, you can simplify your pipeline and avoid costly re-ingestion or re-processing of raw data and have the full power of SQL to compute complex aggregations over an efficiently encoded and filtered dataset.
1
skipChangeCommits
flag can be set on the target streaming table to ignore those changes.1
Limitations
Metrics for the target table, such as number of output rows, are not available.
SCD type 2 updates will add a history row for every input row, even if no columns have changed.
The target of the
APPLY CHANGES INTO
query orapply_changes
function cannot be used as a source for a streaming table. A table that reads from the target of anAPPLY CHANGES INTO
query orapply_changes
function must be a live table.Expectations are not supported in an
APPLY CHANGES INTO
query orapply_changes()
function. To use expectations for the source or target dataset:Add expectations on source data by defining an intermediate table with the required expectations and use this dataset as the source for the target table.
Add expectations on target data with a downstream table that reads input data from the target table.
1
@table
decorator is used to define both materialized views and streaming tables.@table
to a query that performs a static read against a data source. To define a streaming table, apply @table
to a query that performs a streaming read against a data source. Both dataset types have the same syntax.2
For tables less than 1 TB in size, Databricks recommends letting Delta Live Tables control data organization. Unless you expect your table to grow beyond a terabyte, you should generally not specify partition columns.
3
Configure a streaming table to ignore changes in a source streaming table
Note
To use the
skipChangeCommits
flag, you must select the Preview channel in your pipeline settings.The
skipChangeCommits
flag works only withspark.readStream
using theoption()
function. You cannot use this flag in adlt.read_stream()
function.
skipChangeCommits
flag can be set on the target streaming table to ignore those changes.1
Note
In Databricks Runtime 12.1 and above, skipChangeCommits
deprecates the previous setting ignoreChanges
. In Databricks Runtime 12.0 and lower, ignoreChanges
is the only supported option.
The semantics for ignoreChanges
differ greatly from skipChangeCommits
. With ignoreChanges
enabled, rewritten data files in the source table are re-emitted after a data changing operation such as UPDATE
, MERGE INTO
, DELETE
(within partitions), or OVERWRITE
. Unchanged rows are often emitted alongside new rows, so downstream consumers must be able to handle duplicates. Deletes are not propagated downstream. ignoreChanges
subsumes ignoreDeletes
.
skipChangeCommits
disregards file changing operations entirely. Data files that are rewritten in the source table due to data changing operation such as UPDATE
, MERGE INTO
, DELETE
, and OVERWRITE
are ignored entirely. In order to reflect changes in upstream source tables, you must implement separate logic to propagate these changes.
Monday, April 10, 2023
Python - Create CredFile to encode secrets locally
from cryptography.fernet import Fernet
import re
import ctypes
import time
import os
import sys
class Credentials():
def __init__(self):
self.__username = ""
self.__key = ""
self.__password = ""
self.__key_file = 'key.key'
self.__time_of_exp = -1
# ----------------------------------------
# Getter setter for attributes
# ----------------------------------------
@property
def username(self):
return self.__username
@username.setter
def username(self, username):
while (username == ''):
username = input('Enter a proper User name, blank is not accepted:')
self.__username = username
@property
def password(self):
return self.__password
@password.setter
def password(self, password):
self.__key = Fernet.generate_key()
f = Fernet(self.__key)
self.__password = f.encrypt(password.encode()).decode()
del f
@property
def expiry_time(self):
return self.__time_of_exp
@expiry_time.setter
def expiry_time(self, exp_time):
if (exp_time >= 2):
self.__time_of_exp = exp_time
def create_cred(self):
"""
This function is responsible for encrypting the password and create key file for
storing the key and create a credential file with user name and password
"""
cred_filename = 'CredFile.ini'
with open(cred_filename, 'w') as file_in:
file_in.write("#Credential file:\nUsername={}\nPassword={}\nExpiry={}\n"
.format(self.__username, self.__password, self.__time_of_exp))
file_in.write("++" * 20)
# If there exists an older key file, This will remove it.
if (os.path.exists(self.__key_file)):
os.remove(self.__key_file)
# Open the Key.key file and place the key in it.
# The key file is hidden.
try:
os_type = sys.platform
if (os_type == 'linux'):
self.__key_file = '.' + self.__key_file
with open(self.__key_file, 'w') as key_in:
key_in.write(self.__key.decode())
# Hidding the key file.
# The below code snippet finds out which current os the script is running on and does the task base on it.
if (os_type == 'win32'):
ctypes.windll.kernel32.SetFileAttributesW(self.__key_file, 2)
else:
pass
except PermissionError:
os.remove(self.__key_file)
print("A Permission error occurred.\n Please re run the script")
sys.exit()
self.__username = ""
self.__password = ""
self.__key = ""
self.__key_file
def main():
# Creating an object for Credentials class
creds = Credentials()
# Accepting credentials
creds.username = input("Enter UserName:")
creds.password = input("Enter Password:")
print("Enter the epiry time for key file in minutes, [default:Will never expire]")
creds.expiry_time = int(input("Enter time:") or '-1')
# calling the Credit
creds.create_cred()
print("**" * 20)
print("Cred file created successfully at {}"
.format(time.ctime()))
if not (creds.expiry_time == -1):
os.startfile('expire.py')
print("**" * 20)
if __name__ == "__main__":
main()