Wednesday, June 28, 2023

Azure - functions to get schema from schema registry using API

 import requests


AZURE_TENANT_ID = 'xxxx'
AZURE_CLIENT_ID = 'xxxx'
AZURE_CLIENT_SECRET = 'xxxx'
group_name = "xxxx"
auth_url = f'https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token'
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36"
}

class GetSchemaFromAzure():
def __init__(self, schema_name, fully_qualified_namespace):
self._fully_qualified_namespace = fully_qualified_namespace
self._schema_group = group_name
self._schema_name = schema_name
def _get_azure_schema(self):
# GET AUTH TOKEN TO GET SCHEMA
_params = {"grant_type": 'client_credentials',
"client_id": AZURE_CLIENT_ID,
"client_secret": AZURE_CLIENT_SECRET,
"scope": "https://eventhubs.azure.net/.default"
}

_auth_session = requests.session()
_auth_response = _auth_session.post(auth_url, headers=headers, data=_params)

_access_token = _auth_response.json()['access_token']

# PASS TOKEN TO INVOKE ADF PIPELINE TRIGGER
_api_url = f'https://{self._fully_qualified_namespace}/$schemagroups/{self._schema_group}/schemas/{self._schema_name}?api-version=2020-09-01-preview'
print(_api_url)

headers['Authorization'] = f"Bearer {_access_token}"

_response = requests.get(_api_url, headers=headers)
res = _response.json()
return res

def _get_avro_schema(self):
_schema = self._get_azure_schema()
_schema_df = spark.read.format("avro").option("avroSchema", schema).load()

def get_str_schema(self):
_schema = self._get_azure_schema()
_json_schema = _schema['fields']
_list_schema = []
for col in _json_schema:
if not isinstance(col['type'], list):
col['type'] = list(col['type'].split(" "))
_col_schema = col['name'] + " " + col['type'][0].upper()
_list_schema.append(_col_schema)
print("header count: ", len(_list_schema))
str_schema = ", ".join(col for col in _list_schema)

return str_schema


Example:
AzureSchema = GetSchemaFromAzure(schema_name=xxx, fully_qualified_namespace=fully_qualified_namespace)
AzureSchema.get_str_schema()

Databricks - some useful functions

 Function to drop duplicate columns when join different dataframes


def dropDupeDfCols(df):
newcols = []
dupcols = []

for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)

df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))

return df.toDF(*newcols)

Function to remove nested list

def removeNestings(l):
for i in l:
if type(i) == list:
removeNestings(i)
else:
output.append(i)

Function to dynamic add columns with null value

from functools import reduce

def add_col(df, cl):
return df.withColumn(cl, lit('null'))

test_df = reduce(add_col, new_col_list, targetDF)