Friday, March 11, 2022

PySpark - deal with column renaming and data type changes

 Data Column name mapping


if need_mapping == "true":

df_transformed = df_raw.select(csv_cols_list)
for csv_col,rpt_col in zip(csv_cols_list, report_cols_list):
df_transformed = df_transformed.withColumnRenamed(csv_col,rpt_col)
else:
df_transformed = df_raw


Data type changes:

e.g. string value like ($3,400) change to -3400

for item in numeric_cols_list:
df = df.withColumn(item, when(instr(col(item), "(") > 0,
concat(F.lit('-'),F.regexp_replace(item, "[(,\$#),]",'')))
.otherwise(F.regexp_replace(item, "[(,\$#),]", '')).cast("double"))



Thursday, March 10, 2022

Python - upload file to S3 using boto3

Get S3 Session


import boto3


def _get_s3_session():
envvars = subprocess.check_output(
['aws-vault', 'exec', 'your-aws-role', '--', 'env'])

aws_access_key_id = ''
aws_secret_access_key = ''
aws_session_token = ''
for envline in envvars.split(b'\n'):
line = envline.decode('utf8')
eqpos = line.find('=')
if eqpos < 4:
continue
k = line[0:eqpos]
v = line[eqpos + 1:]
if k == 'AWS_ACCESS_KEY_ID':
aws_access_key_id = v
if k == 'AWS_SECRET_ACCESS_KEY':
aws_secret_access_key = v
if k == 'AWS_SESSION_TOKEN':
aws_session_token = v

session = boto3.Session(aws_access_key_id, aws_secret_access_key, aws_session_token)
return session

Upload files to S3

class ProgressPercentage(object):
def __init__(self, filename, size=None, prefix_str=''):
self._filename = filename
if size is None:
self._size = float(os.path.getsize(filename))
else:
self._size = size
self._prefix_str = prefix_str
self._seen_so_far = 0
self._lock = threading.Lock()

def __call__(self, bytes_amount):
# To simplify we'll assume this is hooked up
# to a single filename.
with self._lock:
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
sys.stdout.write(
"\r%s %s / %s (%.2f%%)" % (
self._prefix_str,
self._seen_so_far,
self._size,
percentage))
sys.stdout.flush()
def _upload_to_s3(self, **kwargs):
start = time.time()

self._object_name = kwargs['object_name']
self._file_path = kwargs['path']
self._bucket_name = kwargs['bucket_name']

# If S3 object_name was not specified, use file_name
if self._object_name is None:
self._object_name = ntpath.basename(self._file_path )
# Upload the file
session = _get_s3_session()
s3_client = session.client("s3")

try:
# Perform the transfer
s3_client.upload_file(
self._file_path,
self._bucket_name,
self._object_name,
Callback=ProgressPercentage(
self._file_path,
prefix_str='Uploading file {:} -> {:}.{:}: '.format(
os.path.dirname(self._file_path), self._bucket_name, self._object_name)),
)


Python - Best practice for Python main function definition and program start/exit

exit()

The exit() will straightforwardly end the execution of the Python code/script. The code that follows thereafter the exit() command won’t be executed.

You can think of exit() as an alias for quit() (or vice-versa) in Python. They simply co-exist to make Python more user-friendly. However, it is not a good practice to use quit() and exit() in production code and should only be reserved for use in the interpreter.

sys.exit(main())

This will call the function main() and when main finishes, it will exit giving the system the return code that is the result of main().

A simplified example where this might be used:

def main():
try:
doSomething()
return 0
except:
return 1


if __name__ == "__main__":
    sys.exit(main())

exit() vs. sys.exit()

import sys
def foo():
try:
print("Demonstrating sys.exit()")
sys.exit()
print("This line will not be executed!")
except SystemExit:
# argument denoting exit status
print("Abnormal Termination! Encountered SystemExit")
def func():
print("Demonstrating exit()")
print(exit)
exit() # lines after this statement are ignored and are not executed
print("The previous command executes ignoring all the lines after the command")
# Calling both the function:
foo()
print()
func()

Result:

Demonstrating sys.exit()
Abnormal Termination! Encountered SystemExit

Demonstrating exit()
Use exit() or Ctrl-D (i.e. EOF) to exit

When you need something more fancy working with sys.exit() explicitly is a good idea. That’s basically your future approach. But don’t only think of scripts called directly, also think of installed packages. Setuptools has a cross-platform mechanism to define functions as entry points for scripts. If you have this in your setup.py:

from setuptools import setup

setup(
name='scraper',
version='1.0.0',
install_requires=[
'boto3>=1.17.17',
'keyring>=22.3.0',
'bs4',
'requests_ntlm'
],
entry_points={
'console_scripts': [
'scraper=scraper.__main__:main'
]
},
)

and install that package you can run your_script from the command line.

pip install -e ./
scraper  -c conf/conf_upload.json 

Template of code structure would be:

import sys
from argparse import ArgumentParser, Namespace
from typing import Dict, List
import yaml # just used as an example here for loading more configs, optional


def parse_arguments(cli_args: List[str] = None) -> Namespace:
parser = ArgumentParser()
# parser.add_argument()
# ...
return parser.parse_args(args=cli_args) # None defaults to sys.argv[1:]


def load_configs(args: Namespace) -> Dict:
try:
with open(args.config_path, 'r') as file_pointer:
config = yaml.safe_load(file_pointer)

# arrange and check configs here

return config
except Exception as err:
# log errors
print(err)
if err == "Really Bad":
raise err

# potentionally return some sane fallback defaults if desired/reasonable
sane_defaults = []
return sane_defaults


def main(args: Namespace = parse_arguments()) -> int:
try:
# maybe load some additional config files here or in a function called here
# e.g. args contains a path to a config folder; or use sane defaults
# if the config files are missing(if that is your desired behavior)
config = load_configs(args)
do_real_work(args, config)

except KeyboardInterrupt:
print("Aborted manually.", file=sys.stderr)
return 1

except Exception as err:
# (in real code the `except` would probably be less broad)
# Turn exceptions into appropriate logs and/or console output.

# log err
print("An unhandled exception crashed the application!", err)

# non-zero return code to signal error
# Can of course be more fine grained than this general
# "something went wrong" code.
return 1

return 0 # success


# __main__ support is still here to make this file executable without
# installing the package first.
if __name__ == "__main__":
sys.exit(main(parse_arguments()))

Another example,

import sys
import importlib
import argparse
import json


def main():
conf_default_path = '{:}/conf/conf_upload.json'.format(os.path.abspath(__file__ + "/../../"))
parser = argparse.ArgumentParser(description='Transform files from/to an AWS-S3 object.')
parser.add_argument("-v", "--verbose",
help="increase output verbosity",
action="store_true")
parser.add_argument("-c", "--conf-path",
help="path to pipeline json config file",
default=conf_default_path,
type=str, )

args = parser.parse_args()
if not os.path.exists(args.conf_path):
raise IOError('Pipeline JSON config file {:} does not exist!'.format(args.conf_path))

with open(args.conf_path) as f:
conf = json.load(f)

for stage in conf['pipeline']:
# import module
module = importlib.import_module('scraper.' + stage['module'])

# run module
module.run(**stage['params'])


if __name__ == '__main__':
sys.exit(main())

Wednesday, March 2, 2022

Azure Function - Schema Registry

 import os

from azure.schemaregistry import SchemaRegistryClient
from azure.identity import ClientSecretCredential


FULLY_QUALIFIED_NAMESPACE = "xxxxxx.servicebus.windows.net"
EVENT_HUB_NAME = "xxxxxx"
EVENT_HUB_POLICY_NAME = "xxxxxx"
EVENT_HUB_POLICY_PRIMARY_KEY = "xxxxxx"
SCHEMA_REGISTRY_GROUP_NAME = "xxxxxx"
AZURE_TENANT_ID = "xxxxxx"
AZURE_CLIENT_ID = "xxxxxx"
AZURE_CLIENT_SECRET = "xxxxxx"
format = "avro"

definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}]
}"""

token_credential = ClientSecretCredential(tenant_id=AZURE_TENANT_ID,
client_id= AZURE_CLIENT_ID, client_secret=AZURE_CLIENT_SECRET)

schema_registry_client = SchemaRegistryClient(FULLY_QUALIFIED_NAMESPACE, token_credential)
with schema_registry_client:
schema_registry_client.register_schema(SCHEMA_REGISTRY_GROUP_NAME, "TEST",
definition, format)
schema_properties = schema_registry_client.get_schema_properties(
group_name=SCHEMA_REGISTRY_GROUP_NAME,
name="TEST",
definition=definition,
format="Avro"
)
schema_id = schema_properties.id


print(f'schema_id: {schema_id}')
print(f'schema_format: {schema_properties.format}')

print('schema: ', schema_registry_client.get_schema(schema_id).definition)
print('SCHEMA_REGISTRY_GROUP_NAME: ', SCHEMA_REGISTRY_GROUP_NAME)

Azure Function - Manage Event Hub

 import os

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import ClientSecretCredential

from azure.mgmt.eventhub import EventHubManagementClient
from azure.common.credentials import ServicePrincipalCredentials


FULLY_QUALIFIED_NAMESPACE = xxxxxxxx.servicebus.windows.net"
EVENT_HUB_NAME = "xxxxxxxx"
EVENT_HUB_POLICY_NAME = "xxxxxxxx"
EVENT_HUB_POLICY_PRIMARY_KEY = "xxxxxxxx"
SCHEMA_REGISTRY_GROUP_NAME = "xxxxxxxx"
AZURE_TENANT_ID = "9 xxxxxxxx"
AZURE_CLIENT_ID = "xxxxxxxx"
AZURE_CLIENT_SECRET = "xxxxxxxx"

STORAGE_ACCOUNT_NAME = 'xxxxxxxx'
SUBSCRIPTION_ID = 'xxxxxxxx'
RESOURCE_GROUP_NAME = 'xxxxxxxx'
EVENTHUB_NAMESPACE = 'xxxxxxxx'

token_credential = ClientSecretCredential(tenant_id=AZURE_TENANT_ID,
client_id= AZURE_CLIENT_ID, client_secret=AZURE_CLIENT_SECRET)
eventhub_client = EventHubManagementClient(token_credential,SUBSCRIPTION_ID)

EVENTHUB_NAME = 'xxxxxxxx'

# Create EventHub
print(" Create event hub...")
eventhub = eventhub_client.event_hubs.create_or_update(
RESOURCE_GROUP_NAME,
EVENTHUB_NAMESPACE,
EVENTHUB_NAME,
{
"message_retention_in_days": "1",
"partition_count": "2",
"status": "Active",
"capture_description": {
"enabled": True,
"encoding": "Avro",
"interval_in_seconds": "120",
"size_limit_in_bytes": "10485763",
"destination": {
"name": "EventHubArchive.AzureBlockBlob",
"storage_account_resource_id": "/subscriptions/" + SUBSCRIPTION_ID +
"/resourceGroups/" + RESOURCE_GROUP_NAME + "/providers/Microsoft.Storage/storageAccounts/"
+ STORAGE_ACCOUNT_NAME + "",
"blob_container": "container",
"archive_name_format": "{Namespace}/{EventHub}/{PartitionId}/{Year}
/{Month}/{Day}/{Hour}/{Minute}/{Second}"
}
}
}
)
print("Created EventHub: {}".format(eventhub))

# Create authorization rule
eventhub_rule = eventhub_client.event_hubs.create_or_update_authorization_rule(
resource_group_name=RESOURCE_GROUP_NAME,
namespace_name=EVENTHUB_NAMESPACE,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="AzureFunctionSendPolicy",
parameters={"rights":["SEND"]}
)

# Get authorization rule
eventhub_rule2 = eventhub_client.event_hubs.get_authorization_rule(
resource_group_name=RESOURCE_GROUP_NAME,
namespace_name=EVENTHUB_NAMESPACE,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="AzureFunctionSendPolicy"
)
print("get_authorization_rule() for manager for EventHub: {}\n".format(eventhub_rule2))

# List keys
namespace_keys = eventhub_client.event_hubs.list_keys(
resource_group_name=RESOURCE_GROUP_NAME,
namespace_name=EVENTHUB_NAMESPACE,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="AzureFunctionSendPolicy"
)
print("list_keys() for EventHub: {}\n".format(namespace_keys))
print("namespace_keys.primary_key:",namespace_keys.primary_key)