import requests
AZURE_TENANT_ID = 'xxxx'
AZURE_CLIENT_ID = 'xxxx'
AZURE_CLIENT_SECRET = 'xxxx'
group_name = "xxxx"
auth_url = f'https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token'
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36"
}
class GetSchemaFromAzure():
def __init__(self, schema_name, fully_qualified_namespace):
self._fully_qualified_namespace = fully_qualified_namespace
self._schema_group = group_name
self._schema_name = schema_name
def _get_azure_schema(self):
# GET AUTH TOKEN TO GET SCHEMA
_params = {"grant_type": 'client_credentials',
"client_id": AZURE_CLIENT_ID,
"client_secret": AZURE_CLIENT_SECRET,
"scope": "https://eventhubs.azure.net/.default"
}
_auth_session = requests.session()
_auth_response = _auth_session.post(auth_url, headers=headers, data=_params)
_access_token = _auth_response.json()['access_token']
# PASS TOKEN TO INVOKE ADF PIPELINE TRIGGER
_api_url = f'https://{self._fully_qualified_namespace}/$schemagroups/{self._schema_group}/schemas/{self._schema_name}?api-version=2020-09-01-preview'
print(_api_url)
headers['Authorization'] = f"Bearer {_access_token}"
_response = requests.get(_api_url, headers=headers)
res = _response.json()
return res
def _get_avro_schema(self):
_schema = self._get_azure_schema()
_schema_df = spark.read.format("avro").option("avroSchema", schema).load()
def get_str_schema(self):
_schema = self._get_azure_schema()
_json_schema = _schema['fields']
_list_schema = []
for col in _json_schema:
if not isinstance(col['type'], list):
col['type'] = list(col['type'].split(" "))
_col_schema = col['name'] + " " + col['type'][0].upper()
_list_schema.append(_col_schema)
print("header count: ", len(_list_schema))
str_schema = ", ".join(col for col in _list_schema)
return str_schema
Example:
AzureSchema = GetSchemaFromAzure(schema_name=xxx, fully_qualified_namespace=fully_qualified_namespace)
AzureSchema.get_str_schema()