Tuesday, October 17, 2023

Python - get folder size from s3 bucket

 import boto3

bucket_name = 'xxxxxxx'

def get_size(bucket, path):
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(bucket)
total_size = 0

for obj in my_bucket.objects.filter(Prefix=path):
total_size = total_size + obj.size

return total_size/1024/1024

def list_folders(s3_client, bucket_name):
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix='', Delimiter='/')
for content in response.get("CommonPrefixes", []):
yield content.get('Prefix')

s3_client = boto3.client('s3')
folder_list = list_folders(s3_client, bucket_name)
for folder in folder_list:
print('Folder found: %s' % folder)
print(folder, get_size(bucket_name, folder))

Wednesday, September 27, 2023

Python - Finding Correlation Between Many Variables (Multidimensional Dataset) with Python

import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt

data = pd.read_csv('/Users/Downloads/test.csv', index_col=0)
corr = data.corr()
fig = plt.figure()

ax = fig.add_subplot(111)
cax = ax.matshow(corr, cmap='coolwarm', vmin=-1, vmax=1)
fig.colorbar(cax)
ticks = np.arange(0, len(data.columns), 1)
ax.set_xticks(ticks)
plt.xticks(rotation=90)
ax.set_yticks(ticks)
ax.set_xticklabels(data.columns)
ax.set_yticklabels(data.columns)

plt.show() 






Thursday, September 21, 2023

PySpark SQL - write out records with time across multiple days

 Example:

From A:

To B:

Logic:

1. Use sequence() to list out the interval dates between the start and end date.

2. Use explode() to create duplicate rows with the interval dates.

3. Use lag() and row_number() to create extra dates columns for the next steps.

4. Use CASE statement to select different dates based on the row_no to indicate the first/last row.

Code:

tbl_explode as (
select StartDateTime
, explode(sequence(to_date(dateadd(day,1,StartDateTime)), to_date(dateadd(day,1,EndDateTime)), interval 1 day)) as interval_date
, EndDateTime
, *
from tbl_1
where datediff(day, StartDateTime,EndDateTime) > 0 or cast(StartDateTime as date) <> cast(EndDateTimeUTC as date)
),
tbl_time_split as (
select CASE WHEN row_no_min = 1 THEN StartDateTime
ELSE to_timestamp(lag_date) END AS new_StartDateTime
, CASE WHEN row_no_max = 1 THEN to_timestamp(EndDateTime)
ELSE to_timestamp(interval_date) END AS new_EndDateTime
, *
from(
select interval_date
, lag(interval_date, 1) OVER(PARTITION BY WO ORDER BY interval_date) as lag_date
, row_number() OVER (PARTITION BY WO, EventDate ORDER BY interval_date ) as row_no_min
, row_number() OVER (PARTITION BY WO, EventDate ORDER BY interval_date desc ) as row_no_max
, *
from tbl_explode
) d
)

Wednesday, June 28, 2023

Azure - functions to get schema from schema registry using API

 import requests


AZURE_TENANT_ID = 'xxxx'
AZURE_CLIENT_ID = 'xxxx'
AZURE_CLIENT_SECRET = 'xxxx'
group_name = "xxxx"
auth_url = f'https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token'
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36"
}

class GetSchemaFromAzure():
def __init__(self, schema_name, fully_qualified_namespace):
self._fully_qualified_namespace = fully_qualified_namespace
self._schema_group = group_name
self._schema_name = schema_name
def _get_azure_schema(self):
# GET AUTH TOKEN TO GET SCHEMA
_params = {"grant_type": 'client_credentials',
"client_id": AZURE_CLIENT_ID,
"client_secret": AZURE_CLIENT_SECRET,
"scope": "https://eventhubs.azure.net/.default"
}

_auth_session = requests.session()
_auth_response = _auth_session.post(auth_url, headers=headers, data=_params)

_access_token = _auth_response.json()['access_token']

# PASS TOKEN TO INVOKE ADF PIPELINE TRIGGER
_api_url = f'https://{self._fully_qualified_namespace}/$schemagroups/{self._schema_group}/schemas/{self._schema_name}?api-version=2020-09-01-preview'
print(_api_url)

headers['Authorization'] = f"Bearer {_access_token}"

_response = requests.get(_api_url, headers=headers)
res = _response.json()
return res

def _get_avro_schema(self):
_schema = self._get_azure_schema()
_schema_df = spark.read.format("avro").option("avroSchema", schema).load()

def get_str_schema(self):
_schema = self._get_azure_schema()
_json_schema = _schema['fields']
_list_schema = []
for col in _json_schema:
if not isinstance(col['type'], list):
col['type'] = list(col['type'].split(" "))
_col_schema = col['name'] + " " + col['type'][0].upper()
_list_schema.append(_col_schema)
print("header count: ", len(_list_schema))
str_schema = ", ".join(col for col in _list_schema)

return str_schema


Example:
AzureSchema = GetSchemaFromAzure(schema_name=xxx, fully_qualified_namespace=fully_qualified_namespace)
AzureSchema.get_str_schema()

Databricks - some useful functions

 Function to drop duplicate columns when join different dataframes


def dropDupeDfCols(df):
newcols = []
dupcols = []

for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)

df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))

return df.toDF(*newcols)

Function to remove nested list

def removeNestings(l):
for i in l:
if type(i) == list:
removeNestings(i)
else:
output.append(i)

Function to dynamic add columns with null value

from functools import reduce

def add_col(df, cl):
return df.withColumn(cl, lit('null'))

test_df = reduce(add_col, new_col_list, targetDF)

Tuesday, May 9, 2023

Databricks - DLT some notes



When to use views, materialized views, and streaming tables

To ensure your pipelines are efficient and maintainable, choose the best dataset type when you implement your pipeline queries.

Consider using a view when:

  • You have a large or complex query that you want to break into easier-to-manage queries.

  • You want to validate intermediate results using expectations.

  • You want to reduce storage and compute costs and do not require the materialization of query results. Because tables are materialized, they require additional computation and storage resources.

Consider using a materialized view when:

  • Multiple downstream queries consume the table. Because views are computed on demand, the view is re-computed every time the view is queried.

  • The table is consumed by other pipelines, jobs, or queries. Because views are not materialized, you can only use them in the same pipeline.

  • You want to view the results of a query during development. Because tables are materialized and can be viewed and queried outside of the pipeline, using tables during development can help validate the correctness of computations. After validating, convert queries that do not require materialization into views.

Consider using a streaming table when:

  • A query is defined against a data source that is continuously or incrementally growing.

  • Query results should be computed incrementally.

  • High throughput and low latency is desired for the pipeline.

2

 By default, Delta Live Tables recomputes table results based on input data each time a pipeline is updated, so you need to make sure the deleted record isn’t reloaded from the source data. Setting the pipelines.reset.allowed table property to false prevents refreshes to a table, but does not prevent incremental writes to the tables or prevent new data from flowing into the table.

3

By contrast, the final tables in a pipeline, commonly referred to as gold tables, often require 
complicated aggregations or read from sources that are the targets of an 
APPLY CHANGES INTO operation. Because these operations inherently create updates rather than appends, they are not supported as inputs to streaming tables. These transformations are better suited for materialized views. By mixing streaming tables and materialized views into a single pipeline, you can simplify your pipeline and avoid costly re-ingestion or re-processing of raw data and have the full power of SQL to compute complex aggregations over an efficiently encoded and filtered dataset.



1

By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits flag can be set on the target streaming table to ignore those changes.




1

Limitations

  • Metrics for the target table, such as number of output rows, are not available.

  • SCD type 2 updates will add a history row for every input row, even if no columns have changed.

  • The target of the APPLY CHANGES INTO query or apply_changes function cannot be used as a source for a streaming table. A table that reads from the target of an APPLY CHANGES INTO query or apply_changes function must be a live table.

  • Expectations are not supported in an APPLY CHANGES INTO query or apply_changes() function. To use expectations for the source or target dataset:

    • Add expectations on source data by defining an intermediate table with the required expectations and use this dataset as the source for the target table.

    • Add expectations on target data with a downstream table that reads input data from the target table.


1

In Python, Delta Live Tables determines whether to update a dataset as a materialized view or streaming table based on the defining query. The @table decorator is used to define both materialized views and streaming tables.

To define a materialized view in Python, apply @table to a query that performs a static read against a data source. To define a streaming table, apply @table to a query that performs a streaming read against a data source. Both dataset types have the same syntax.

2

For tables less than 1 TB in size, Databricks recommends letting Delta Live Tables control data organization. Unless you expect your table to grow beyond a terabyte, you should generally not specify partition columns.


3

Configure a streaming table to ignore changes in a source streaming table

Note

  • To use the skipChangeCommits flag, you must select the Preview channel in your pipeline settings.

  • The skipChangeCommits flag works only with spark.readStream using the option() function. You cannot use this flag in a dlt.read_stream() function.


By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits flag can be set on the target streaming table to ignore those changes.



1


Note

In Databricks Runtime 12.1 and above, skipChangeCommits deprecates the previous setting ignoreChanges. In Databricks Runtime 12.0 and lower, ignoreChanges is the only supported option.

The semantics for ignoreChanges differ greatly from skipChangeCommits. With ignoreChanges enabled, rewritten data files in the source table are re-emitted after a data changing operation such as UPDATEMERGE INTODELETE (within partitions), or OVERWRITE. Unchanged rows are often emitted alongside new rows, so downstream consumers must be able to handle duplicates. Deletes are not propagated downstream. ignoreChanges subsumes ignoreDeletes.

skipChangeCommits disregards file changing operations entirely. Data files that are rewritten in the source table due to data changing operation such as UPDATEMERGE INTODELETE, and OVERWRITE are ignored entirely. In order to reflect changes in upstream source tables, you must implement separate logic to propagate these changes.



Monday, April 10, 2023

Python - Create CredFile to encode secrets locally

 from cryptography.fernet import Fernet

import re
import ctypes
import time
import os
import sys


class Credentials():

def __init__(self):
self.__username = ""
self.__key = ""
self.__password = ""
self.__key_file = 'key.key'
self.__time_of_exp = -1

# ----------------------------------------
# Getter setter for attributes
# ----------------------------------------

@property
def username(self):
return self.__username

@username.setter
def username(self, username):
while (username == ''):
username = input('Enter a proper User name, blank is not accepted:')
self.__username = username

@property
def password(self):
return self.__password

@password.setter
def password(self, password):
self.__key = Fernet.generate_key()
f = Fernet(self.__key)
self.__password = f.encrypt(password.encode()).decode()
del f

@property
def expiry_time(self):
return self.__time_of_exp

@expiry_time.setter
def expiry_time(self, exp_time):
if (exp_time >= 2):
self.__time_of_exp = exp_time

def create_cred(self):
"""
This function is responsible for encrypting the password and create key file for
storing the key and create a credential file with user name and password
"""

cred_filename = 'CredFile.ini'

with open(cred_filename, 'w') as file_in:
file_in.write("#Credential file:\nUsername={}\nPassword={}\nExpiry={}\n"
.format(self.__username, self.__password, self.__time_of_exp))
file_in.write("++" * 20)

# If there exists an older key file, This will remove it.
if (os.path.exists(self.__key_file)):
os.remove(self.__key_file)

# Open the Key.key file and place the key in it.
# The key file is hidden.
try:

os_type = sys.platform
if (os_type == 'linux'):
self.__key_file = '.' + self.__key_file

with open(self.__key_file, 'w') as key_in:
key_in.write(self.__key.decode())
# Hidding the key file.
# The below code snippet finds out which current os the script is running on and does the task base on it.
if (os_type == 'win32'):
ctypes.windll.kernel32.SetFileAttributesW(self.__key_file, 2)
else:
pass

except PermissionError:
os.remove(self.__key_file)
print("A Permission error occurred.\n Please re run the script")
sys.exit()

self.__username = ""
self.__password = ""
self.__key = ""
self.__key_file


def main():
# Creating an object for Credentials class
creds = Credentials()

# Accepting credentials
creds.username = input("Enter UserName:")
creds.password = input("Enter Password:")
print("Enter the epiry time for key file in minutes, [default:Will never expire]")
creds.expiry_time = int(input("Enter time:") or '-1')

# calling the Credit
creds.create_cred()
print("**" * 20)
print("Cred file created successfully at {}"
.format(time.ctime()))

if not (creds.expiry_time == -1):
os.startfile('expire.py')

print("**" * 20)


if __name__ == "__main__":
main()