Nan's Blog
Wednesday, July 31, 2024
Tableau - trigger starting extract job
This summary is not available. Please
click here to view the post.
SQL - Why count are different?
Be careful using count(distinct col1, col2, col3,...) !
If any of the columns which are used in the count(distinct xxx) function that have NULL values, the count is not accurate!
Tuesday, October 17, 2023
Python - get folder size from s3 bucket
import boto3
bucket_name = 'xxxxxxx'
def get_size(bucket, path):
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(bucket)
total_size = 0
for obj in my_bucket.objects.filter(Prefix=path):
total_size = total_size + obj.size
return total_size/1024/1024
def list_folders(s3_client, bucket_name):
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix='', Delimiter='/')
for content in response.get("CommonPrefixes", []):
yield content.get('Prefix')
s3_client = boto3.client('s3')
folder_list = list_folders(s3_client, bucket_name)
for folder in folder_list:
print('Folder found: %s' % folder)
print(folder, get_size(bucket_name, folder))
Wednesday, September 27, 2023
Python - Finding Correlation Between Many Variables (Multidimensional Dataset) with Python
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
data = pd.read_csv('/Users/Downloads/test.csv', index_col=0)
corr = data.corr()
fig = plt.figure()
ax = fig.add_subplot(111)
cax = ax.matshow(corr, cmap='coolwarm', vmin=-1, vmax=1)
fig.colorbar(cax)
ticks = np.arange(0, len(data.columns), 1)
ax.set_xticks(ticks)
plt.xticks(rotation=90)
ax.set_yticks(ticks)
ax.set_xticklabels(data.columns)
ax.set_yticklabels(data.columns)
plt.show()
Thursday, September 21, 2023
PySpark SQL - write out records with time across multiple days
Example:
From A:
To B:
Logic:
1. Use sequence() to list out the interval dates between the start and end date.
2. Use explode() to create duplicate rows with the interval dates.
3. Use lag() and row_number() to create extra dates columns for the next steps.
4. Use CASE statement to select different dates based on the row_no to indicate the first/last row.
Code:
tbl_explode as (
select StartDateTime
, explode(sequence(to_date(dateadd(day,1,StartDateTime)), to_date(dateadd(day,1,EndDateTime)), interval 1 day)) as interval_date
, EndDateTime
, *
from tbl_1
where datediff(day, StartDateTime,EndDateTime) > 0 or cast(StartDateTime as date) <> cast(EndDateTimeUTC as date)
),
tbl_time_split as (
select CASE WHEN row_no_min = 1 THEN StartDateTime
ELSE to_timestamp(lag_date) END AS new_StartDateTime
, CASE WHEN row_no_max = 1 THEN to_timestamp(EndDateTime)
ELSE to_timestamp(interval_date) END AS new_EndDateTime
, *
from(
select interval_date
, lag(interval_date, 1) OVER(PARTITION BY WO ORDER BY interval_date) as lag_date
, row_number() OVER (PARTITION BY WO, EventDate ORDER BY interval_date ) as row_no_min
, row_number() OVER (PARTITION BY WO, EventDate ORDER BY interval_date desc ) as row_no_max
, *
from tbl_explode
) d
)
Wednesday, June 28, 2023
Azure - functions to get schema from schema registry using API
import requests
AZURE_TENANT_ID = 'xxxx'
AZURE_CLIENT_ID = 'xxxx'
AZURE_CLIENT_SECRET = 'xxxx'
group_name = "xxxx"
auth_url = f'https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token'
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36"
}
class GetSchemaFromAzure():
def __init__(self, schema_name, fully_qualified_namespace):
self._fully_qualified_namespace = fully_qualified_namespace
self._schema_group = group_name
self._schema_name = schema_name
def _get_azure_schema(self):
# GET AUTH TOKEN TO GET SCHEMA
_params = {"grant_type": 'client_credentials',
"client_id": AZURE_CLIENT_ID,
"client_secret": AZURE_CLIENT_SECRET,
"scope": "https://eventhubs.azure.net/.default"
}
_auth_session = requests.session()
_auth_response = _auth_session.post(auth_url, headers=headers, data=_params)
_access_token = _auth_response.json()['access_token']
# PASS TOKEN TO INVOKE ADF PIPELINE TRIGGER
_api_url = f'https://{self._fully_qualified_namespace}/$schemagroups/{self._schema_group}/schemas/{self._schema_name}?api-version=2020-09-01-preview'
print(_api_url)
headers['Authorization'] = f"Bearer {_access_token}"
_response = requests.get(_api_url, headers=headers)
res = _response.json()
return res
def _get_avro_schema(self):
_schema = self._get_azure_schema()
_schema_df = spark.read.format("avro").option("avroSchema", schema).load()
def get_str_schema(self):
_schema = self._get_azure_schema()
_json_schema = _schema['fields']
_list_schema = []
for col in _json_schema:
if not isinstance(col['type'], list):
col['type'] = list(col['type'].split(" "))
_col_schema = col['name'] + " " + col['type'][0].upper()
_list_schema.append(_col_schema)
print("header count: ", len(_list_schema))
str_schema = ", ".join(col for col in _list_schema)
return str_schema
Example:
AzureSchema = GetSchemaFromAzure(schema_name=xxx, fully_qualified_namespace=fully_qualified_namespace)
AzureSchema.get_str_schema()
Databricks - some useful functions
Function to drop duplicate columns when join different dataframes
def dropDupeDfCols(df):
newcols = []
dupcols = []
for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)
df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))
return df.toDF(*newcols)
Function to remove nested list
def removeNestings(l):
for i in l:
if type(i) == list:
removeNestings(i)
else:
output.append(i)
Function to dynamic add columns with null value
from functools import reduce
def add_col(df, cl):
return df.withColumn(cl, lit('null'))
test_df = reduce(add_col, new_col_list, targetDF)
Subscribe to:
Posts (Atom)