Wednesday, July 31, 2024

Tableau - trigger starting extract job

This summary is not available. Please click here to view the post.

SQL - Why count are different?

 
Be careful using count(distinct col1, col2, col3,...) !

If any of the columns which are used in the count(distinct xxx) function that have NULL values, the count is not accurate!






Tuesday, October 17, 2023

Python - get folder size from s3 bucket

 import boto3

bucket_name = 'xxxxxxx'

def get_size(bucket, path):
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(bucket)
total_size = 0

for obj in my_bucket.objects.filter(Prefix=path):
total_size = total_size + obj.size

return total_size/1024/1024

def list_folders(s3_client, bucket_name):
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix='', Delimiter='/')
for content in response.get("CommonPrefixes", []):
yield content.get('Prefix')

s3_client = boto3.client('s3')
folder_list = list_folders(s3_client, bucket_name)
for folder in folder_list:
print('Folder found: %s' % folder)
print(folder, get_size(bucket_name, folder))

Wednesday, September 27, 2023

Python - Finding Correlation Between Many Variables (Multidimensional Dataset) with Python

import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt

data = pd.read_csv('/Users/Downloads/test.csv', index_col=0)
corr = data.corr()
fig = plt.figure()

ax = fig.add_subplot(111)
cax = ax.matshow(corr, cmap='coolwarm', vmin=-1, vmax=1)
fig.colorbar(cax)
ticks = np.arange(0, len(data.columns), 1)
ax.set_xticks(ticks)
plt.xticks(rotation=90)
ax.set_yticks(ticks)
ax.set_xticklabels(data.columns)
ax.set_yticklabels(data.columns)

plt.show() 






Thursday, September 21, 2023

PySpark SQL - write out records with time across multiple days

 Example:

From A:

To B:

Logic:

1. Use sequence() to list out the interval dates between the start and end date.

2. Use explode() to create duplicate rows with the interval dates.

3. Use lag() and row_number() to create extra dates columns for the next steps.

4. Use CASE statement to select different dates based on the row_no to indicate the first/last row.

Code:

tbl_explode as (
select StartDateTime
, explode(sequence(to_date(dateadd(day,1,StartDateTime)), to_date(dateadd(day,1,EndDateTime)), interval 1 day)) as interval_date
, EndDateTime
, *
from tbl_1
where datediff(day, StartDateTime,EndDateTime) > 0 or cast(StartDateTime as date) <> cast(EndDateTimeUTC as date)
),
tbl_time_split as (
select CASE WHEN row_no_min = 1 THEN StartDateTime
ELSE to_timestamp(lag_date) END AS new_StartDateTime
, CASE WHEN row_no_max = 1 THEN to_timestamp(EndDateTime)
ELSE to_timestamp(interval_date) END AS new_EndDateTime
, *
from(
select interval_date
, lag(interval_date, 1) OVER(PARTITION BY WO ORDER BY interval_date) as lag_date
, row_number() OVER (PARTITION BY WO, EventDate ORDER BY interval_date ) as row_no_min
, row_number() OVER (PARTITION BY WO, EventDate ORDER BY interval_date desc ) as row_no_max
, *
from tbl_explode
) d
)

Wednesday, June 28, 2023

Azure - functions to get schema from schema registry using API

 import requests


AZURE_TENANT_ID = 'xxxx'
AZURE_CLIENT_ID = 'xxxx'
AZURE_CLIENT_SECRET = 'xxxx'
group_name = "xxxx"
auth_url = f'https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token'
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36"
}

class GetSchemaFromAzure():
def __init__(self, schema_name, fully_qualified_namespace):
self._fully_qualified_namespace = fully_qualified_namespace
self._schema_group = group_name
self._schema_name = schema_name
def _get_azure_schema(self):
# GET AUTH TOKEN TO GET SCHEMA
_params = {"grant_type": 'client_credentials',
"client_id": AZURE_CLIENT_ID,
"client_secret": AZURE_CLIENT_SECRET,
"scope": "https://eventhubs.azure.net/.default"
}

_auth_session = requests.session()
_auth_response = _auth_session.post(auth_url, headers=headers, data=_params)

_access_token = _auth_response.json()['access_token']

# PASS TOKEN TO INVOKE ADF PIPELINE TRIGGER
_api_url = f'https://{self._fully_qualified_namespace}/$schemagroups/{self._schema_group}/schemas/{self._schema_name}?api-version=2020-09-01-preview'
print(_api_url)

headers['Authorization'] = f"Bearer {_access_token}"

_response = requests.get(_api_url, headers=headers)
res = _response.json()
return res

def _get_avro_schema(self):
_schema = self._get_azure_schema()
_schema_df = spark.read.format("avro").option("avroSchema", schema).load()

def get_str_schema(self):
_schema = self._get_azure_schema()
_json_schema = _schema['fields']
_list_schema = []
for col in _json_schema:
if not isinstance(col['type'], list):
col['type'] = list(col['type'].split(" "))
_col_schema = col['name'] + " " + col['type'][0].upper()
_list_schema.append(_col_schema)
print("header count: ", len(_list_schema))
str_schema = ", ".join(col for col in _list_schema)

return str_schema


Example:
AzureSchema = GetSchemaFromAzure(schema_name=xxx, fully_qualified_namespace=fully_qualified_namespace)
AzureSchema.get_str_schema()

Databricks - some useful functions

 Function to drop duplicate columns when join different dataframes


def dropDupeDfCols(df):
newcols = []
dupcols = []

for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)

df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))

return df.toDF(*newcols)

Function to remove nested list

def removeNestings(l):
for i in l:
if type(i) == list:
removeNestings(i)
else:
output.append(i)

Function to dynamic add columns with null value

from functools import reduce

def add_col(df, cl):
return df.withColumn(cl, lit('null'))

test_df = reduce(add_col, new_col_list, targetDF)