Showing posts with label Python. Show all posts
Showing posts with label Python. Show all posts

Tuesday, October 17, 2023

Python - get folder size from s3 bucket

 import boto3

bucket_name = 'xxxxxxx'

def get_size(bucket, path):
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(bucket)
total_size = 0

for obj in my_bucket.objects.filter(Prefix=path):
total_size = total_size + obj.size

return total_size/1024/1024

def list_folders(s3_client, bucket_name):
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix='', Delimiter='/')
for content in response.get("CommonPrefixes", []):
yield content.get('Prefix')

s3_client = boto3.client('s3')
folder_list = list_folders(s3_client, bucket_name)
for folder in folder_list:
print('Folder found: %s' % folder)
print(folder, get_size(bucket_name, folder))

Wednesday, September 27, 2023

Python - Finding Correlation Between Many Variables (Multidimensional Dataset) with Python

import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt

data = pd.read_csv('/Users/Downloads/test.csv', index_col=0)
corr = data.corr()
fig = plt.figure()

ax = fig.add_subplot(111)
cax = ax.matshow(corr, cmap='coolwarm', vmin=-1, vmax=1)
fig.colorbar(cax)
ticks = np.arange(0, len(data.columns), 1)
ax.set_xticks(ticks)
plt.xticks(rotation=90)
ax.set_yticks(ticks)
ax.set_xticklabels(data.columns)
ax.set_yticklabels(data.columns)

plt.show() 






Wednesday, June 28, 2023

Azure - functions to get schema from schema registry using API

 import requests


AZURE_TENANT_ID = 'xxxx'
AZURE_CLIENT_ID = 'xxxx'
AZURE_CLIENT_SECRET = 'xxxx'
group_name = "xxxx"
auth_url = f'https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token'
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36"
}

class GetSchemaFromAzure():
def __init__(self, schema_name, fully_qualified_namespace):
self._fully_qualified_namespace = fully_qualified_namespace
self._schema_group = group_name
self._schema_name = schema_name
def _get_azure_schema(self):
# GET AUTH TOKEN TO GET SCHEMA
_params = {"grant_type": 'client_credentials',
"client_id": AZURE_CLIENT_ID,
"client_secret": AZURE_CLIENT_SECRET,
"scope": "https://eventhubs.azure.net/.default"
}

_auth_session = requests.session()
_auth_response = _auth_session.post(auth_url, headers=headers, data=_params)

_access_token = _auth_response.json()['access_token']

# PASS TOKEN TO INVOKE ADF PIPELINE TRIGGER
_api_url = f'https://{self._fully_qualified_namespace}/$schemagroups/{self._schema_group}/schemas/{self._schema_name}?api-version=2020-09-01-preview'
print(_api_url)

headers['Authorization'] = f"Bearer {_access_token}"

_response = requests.get(_api_url, headers=headers)
res = _response.json()
return res

def _get_avro_schema(self):
_schema = self._get_azure_schema()
_schema_df = spark.read.format("avro").option("avroSchema", schema).load()

def get_str_schema(self):
_schema = self._get_azure_schema()
_json_schema = _schema['fields']
_list_schema = []
for col in _json_schema:
if not isinstance(col['type'], list):
col['type'] = list(col['type'].split(" "))
_col_schema = col['name'] + " " + col['type'][0].upper()
_list_schema.append(_col_schema)
print("header count: ", len(_list_schema))
str_schema = ", ".join(col for col in _list_schema)

return str_schema


Example:
AzureSchema = GetSchemaFromAzure(schema_name=xxx, fully_qualified_namespace=fully_qualified_namespace)
AzureSchema.get_str_schema()

Databricks - some useful functions

 Function to drop duplicate columns when join different dataframes


def dropDupeDfCols(df):
newcols = []
dupcols = []

for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)

df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))

return df.toDF(*newcols)

Function to remove nested list

def removeNestings(l):
for i in l:
if type(i) == list:
removeNestings(i)
else:
output.append(i)

Function to dynamic add columns with null value

from functools import reduce

def add_col(df, cl):
return df.withColumn(cl, lit('null'))

test_df = reduce(add_col, new_col_list, targetDF)

Monday, April 10, 2023

Python - Create CredFile to encode secrets locally

 from cryptography.fernet import Fernet

import re
import ctypes
import time
import os
import sys


class Credentials():

def __init__(self):
self.__username = ""
self.__key = ""
self.__password = ""
self.__key_file = 'key.key'
self.__time_of_exp = -1

# ----------------------------------------
# Getter setter for attributes
# ----------------------------------------

@property
def username(self):
return self.__username

@username.setter
def username(self, username):
while (username == ''):
username = input('Enter a proper User name, blank is not accepted:')
self.__username = username

@property
def password(self):
return self.__password

@password.setter
def password(self, password):
self.__key = Fernet.generate_key()
f = Fernet(self.__key)
self.__password = f.encrypt(password.encode()).decode()
del f

@property
def expiry_time(self):
return self.__time_of_exp

@expiry_time.setter
def expiry_time(self, exp_time):
if (exp_time >= 2):
self.__time_of_exp = exp_time

def create_cred(self):
"""
This function is responsible for encrypting the password and create key file for
storing the key and create a credential file with user name and password
"""

cred_filename = 'CredFile.ini'

with open(cred_filename, 'w') as file_in:
file_in.write("#Credential file:\nUsername={}\nPassword={}\nExpiry={}\n"
.format(self.__username, self.__password, self.__time_of_exp))
file_in.write("++" * 20)

# If there exists an older key file, This will remove it.
if (os.path.exists(self.__key_file)):
os.remove(self.__key_file)

# Open the Key.key file and place the key in it.
# The key file is hidden.
try:

os_type = sys.platform
if (os_type == 'linux'):
self.__key_file = '.' + self.__key_file

with open(self.__key_file, 'w') as key_in:
key_in.write(self.__key.decode())
# Hidding the key file.
# The below code snippet finds out which current os the script is running on and does the task base on it.
if (os_type == 'win32'):
ctypes.windll.kernel32.SetFileAttributesW(self.__key_file, 2)
else:
pass

except PermissionError:
os.remove(self.__key_file)
print("A Permission error occurred.\n Please re run the script")
sys.exit()

self.__username = ""
self.__password = ""
self.__key = ""
self.__key_file


def main():
# Creating an object for Credentials class
creds = Credentials()

# Accepting credentials
creds.username = input("Enter UserName:")
creds.password = input("Enter Password:")
print("Enter the epiry time for key file in minutes, [default:Will never expire]")
creds.expiry_time = int(input("Enter time:") or '-1')

# calling the Credit
creds.create_cred()
print("**" * 20)
print("Cred file created successfully at {}"
.format(time.ctime()))

if not (creds.expiry_time == -1):
os.startfile('expire.py')

print("**" * 20)


if __name__ == "__main__":
main()

Wednesday, December 7, 2022

Tuesday, June 14, 2022

Python - super()

 The Python super() method lets you access methods from a parent class from within a child class. This helps reduce repetition in your code. 

One core feature of object-oriented programming languages like Python is inheritance. Inheritance is when a new class uses code from another class to create the new class.

When you’re inheriting classes, you may want to gain access to methods from a parent class. That’s where the super() function comes in.

Here is the syntax for the super() method:

class Food():
	def __init__(self, name):
		self.name = name
class Cheese(Food):
	def __init__(self, brand):
		super().__init__()
		self.brand = brand

The super() method does not accept any arguments. 

Whatever comes last in Third() is the parent class. so when calling Third(), it will call Second() first, then First().






Python - Upload file to S3 using pre-signed URL

 

import requests

# API endpoint
api_url = "xxxxxx"

reports = "test/"
file_name = "test.csv"
OBJECT_NAME_TO_UPLOAD = "test.csv"

params = {"action": "upload",
"file_key": reports + file_name,
"bucket_name": "xxxx"
}

headers = {"x-api-key": "xxxx"}

session = requests.session()
# Generate pre-signed-url
response = session.get(api_url, headers=headers, json=params)
res = response.json()

print(res)


# # # # # Upload file to S3 using pre-signed URL
with open(OBJECT_NAME_TO_UPLOAD) as f:

upload_response = session.request('PUT', res['url'], data=f.read().encode('utf-8'))


print(f"Upload response: {upload_response.status_code}")

Thursday, March 10, 2022

Python - upload file to S3 using boto3

Get S3 Session


import boto3


def _get_s3_session():
envvars = subprocess.check_output(
['aws-vault', 'exec', 'your-aws-role', '--', 'env'])

aws_access_key_id = ''
aws_secret_access_key = ''
aws_session_token = ''
for envline in envvars.split(b'\n'):
line = envline.decode('utf8')
eqpos = line.find('=')
if eqpos < 4:
continue
k = line[0:eqpos]
v = line[eqpos + 1:]
if k == 'AWS_ACCESS_KEY_ID':
aws_access_key_id = v
if k == 'AWS_SECRET_ACCESS_KEY':
aws_secret_access_key = v
if k == 'AWS_SESSION_TOKEN':
aws_session_token = v

session = boto3.Session(aws_access_key_id, aws_secret_access_key, aws_session_token)
return session

Upload files to S3

class ProgressPercentage(object):
def __init__(self, filename, size=None, prefix_str=''):
self._filename = filename
if size is None:
self._size = float(os.path.getsize(filename))
else:
self._size = size
self._prefix_str = prefix_str
self._seen_so_far = 0
self._lock = threading.Lock()

def __call__(self, bytes_amount):
# To simplify we'll assume this is hooked up
# to a single filename.
with self._lock:
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
sys.stdout.write(
"\r%s %s / %s (%.2f%%)" % (
self._prefix_str,
self._seen_so_far,
self._size,
percentage))
sys.stdout.flush()
def _upload_to_s3(self, **kwargs):
start = time.time()

self._object_name = kwargs['object_name']
self._file_path = kwargs['path']
self._bucket_name = kwargs['bucket_name']

# If S3 object_name was not specified, use file_name
if self._object_name is None:
self._object_name = ntpath.basename(self._file_path )
# Upload the file
session = _get_s3_session()
s3_client = session.client("s3")

try:
# Perform the transfer
s3_client.upload_file(
self._file_path,
self._bucket_name,
self._object_name,
Callback=ProgressPercentage(
self._file_path,
prefix_str='Uploading file {:} -> {:}.{:}: '.format(
os.path.dirname(self._file_path), self._bucket_name, self._object_name)),
)


Python - Best practice for Python main function definition and program start/exit

exit()

The exit() will straightforwardly end the execution of the Python code/script. The code that follows thereafter the exit() command won’t be executed.

You can think of exit() as an alias for quit() (or vice-versa) in Python. They simply co-exist to make Python more user-friendly. However, it is not a good practice to use quit() and exit() in production code and should only be reserved for use in the interpreter.

sys.exit(main())

This will call the function main() and when main finishes, it will exit giving the system the return code that is the result of main().

A simplified example where this might be used:

def main():
try:
doSomething()
return 0
except:
return 1


if __name__ == "__main__":
    sys.exit(main())

exit() vs. sys.exit()

import sys
def foo():
try:
print("Demonstrating sys.exit()")
sys.exit()
print("This line will not be executed!")
except SystemExit:
# argument denoting exit status
print("Abnormal Termination! Encountered SystemExit")
def func():
print("Demonstrating exit()")
print(exit)
exit() # lines after this statement are ignored and are not executed
print("The previous command executes ignoring all the lines after the command")
# Calling both the function:
foo()
print()
func()

Result:

Demonstrating sys.exit()
Abnormal Termination! Encountered SystemExit

Demonstrating exit()
Use exit() or Ctrl-D (i.e. EOF) to exit

When you need something more fancy working with sys.exit() explicitly is a good idea. That’s basically your future approach. But don’t only think of scripts called directly, also think of installed packages. Setuptools has a cross-platform mechanism to define functions as entry points for scripts. If you have this in your setup.py:

from setuptools import setup

setup(
name='scraper',
version='1.0.0',
install_requires=[
'boto3>=1.17.17',
'keyring>=22.3.0',
'bs4',
'requests_ntlm'
],
entry_points={
'console_scripts': [
'scraper=scraper.__main__:main'
]
},
)

and install that package you can run your_script from the command line.

pip install -e ./
scraper  -c conf/conf_upload.json 

Template of code structure would be:

import sys
from argparse import ArgumentParser, Namespace
from typing import Dict, List
import yaml # just used as an example here for loading more configs, optional


def parse_arguments(cli_args: List[str] = None) -> Namespace:
parser = ArgumentParser()
# parser.add_argument()
# ...
return parser.parse_args(args=cli_args) # None defaults to sys.argv[1:]


def load_configs(args: Namespace) -> Dict:
try:
with open(args.config_path, 'r') as file_pointer:
config = yaml.safe_load(file_pointer)

# arrange and check configs here

return config
except Exception as err:
# log errors
print(err)
if err == "Really Bad":
raise err

# potentionally return some sane fallback defaults if desired/reasonable
sane_defaults = []
return sane_defaults


def main(args: Namespace = parse_arguments()) -> int:
try:
# maybe load some additional config files here or in a function called here
# e.g. args contains a path to a config folder; or use sane defaults
# if the config files are missing(if that is your desired behavior)
config = load_configs(args)
do_real_work(args, config)

except KeyboardInterrupt:
print("Aborted manually.", file=sys.stderr)
return 1

except Exception as err:
# (in real code the `except` would probably be less broad)
# Turn exceptions into appropriate logs and/or console output.

# log err
print("An unhandled exception crashed the application!", err)

# non-zero return code to signal error
# Can of course be more fine grained than this general
# "something went wrong" code.
return 1

return 0 # success


# __main__ support is still here to make this file executable without
# installing the package first.
if __name__ == "__main__":
sys.exit(main(parse_arguments()))

Another example,

import sys
import importlib
import argparse
import json


def main():
conf_default_path = '{:}/conf/conf_upload.json'.format(os.path.abspath(__file__ + "/../../"))
parser = argparse.ArgumentParser(description='Transform files from/to an AWS-S3 object.')
parser.add_argument("-v", "--verbose",
help="increase output verbosity",
action="store_true")
parser.add_argument("-c", "--conf-path",
help="path to pipeline json config file",
default=conf_default_path,
type=str, )

args = parser.parse_args()
if not os.path.exists(args.conf_path):
raise IOError('Pipeline JSON config file {:} does not exist!'.format(args.conf_path))

with open(args.conf_path) as f:
conf = json.load(f)

for stage in conf['pipeline']:
# import module
module = importlib.import_module('scraper.' + stage['module'])

# run module
module.run(**stage['params'])


if __name__ == '__main__':
sys.exit(main())

Wednesday, March 2, 2022

Azure Function - Schema Registry

 import os

from azure.schemaregistry import SchemaRegistryClient
from azure.identity import ClientSecretCredential


FULLY_QUALIFIED_NAMESPACE = "xxxxxx.servicebus.windows.net"
EVENT_HUB_NAME = "xxxxxx"
EVENT_HUB_POLICY_NAME = "xxxxxx"
EVENT_HUB_POLICY_PRIMARY_KEY = "xxxxxx"
SCHEMA_REGISTRY_GROUP_NAME = "xxxxxx"
AZURE_TENANT_ID = "xxxxxx"
AZURE_CLIENT_ID = "xxxxxx"
AZURE_CLIENT_SECRET = "xxxxxx"
format = "avro"

definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}]
}"""

token_credential = ClientSecretCredential(tenant_id=AZURE_TENANT_ID,
client_id= AZURE_CLIENT_ID, client_secret=AZURE_CLIENT_SECRET)

schema_registry_client = SchemaRegistryClient(FULLY_QUALIFIED_NAMESPACE, token_credential)
with schema_registry_client:
schema_registry_client.register_schema(SCHEMA_REGISTRY_GROUP_NAME, "TEST",
definition, format)
schema_properties = schema_registry_client.get_schema_properties(
group_name=SCHEMA_REGISTRY_GROUP_NAME,
name="TEST",
definition=definition,
format="Avro"
)
schema_id = schema_properties.id


print(f'schema_id: {schema_id}')
print(f'schema_format: {schema_properties.format}')

print('schema: ', schema_registry_client.get_schema(schema_id).definition)
print('SCHEMA_REGISTRY_GROUP_NAME: ', SCHEMA_REGISTRY_GROUP_NAME)

Azure Function - Manage Event Hub

 import os

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import ClientSecretCredential

from azure.mgmt.eventhub import EventHubManagementClient
from azure.common.credentials import ServicePrincipalCredentials


FULLY_QUALIFIED_NAMESPACE = xxxxxxxx.servicebus.windows.net"
EVENT_HUB_NAME = "xxxxxxxx"
EVENT_HUB_POLICY_NAME = "xxxxxxxx"
EVENT_HUB_POLICY_PRIMARY_KEY = "xxxxxxxx"
SCHEMA_REGISTRY_GROUP_NAME = "xxxxxxxx"
AZURE_TENANT_ID = "9 xxxxxxxx"
AZURE_CLIENT_ID = "xxxxxxxx"
AZURE_CLIENT_SECRET = "xxxxxxxx"

STORAGE_ACCOUNT_NAME = 'xxxxxxxx'
SUBSCRIPTION_ID = 'xxxxxxxx'
RESOURCE_GROUP_NAME = 'xxxxxxxx'
EVENTHUB_NAMESPACE = 'xxxxxxxx'

token_credential = ClientSecretCredential(tenant_id=AZURE_TENANT_ID,
client_id= AZURE_CLIENT_ID, client_secret=AZURE_CLIENT_SECRET)
eventhub_client = EventHubManagementClient(token_credential,SUBSCRIPTION_ID)

EVENTHUB_NAME = 'xxxxxxxx'

# Create EventHub
print(" Create event hub...")
eventhub = eventhub_client.event_hubs.create_or_update(
RESOURCE_GROUP_NAME,
EVENTHUB_NAMESPACE,
EVENTHUB_NAME,
{
"message_retention_in_days": "1",
"partition_count": "2",
"status": "Active",
"capture_description": {
"enabled": True,
"encoding": "Avro",
"interval_in_seconds": "120",
"size_limit_in_bytes": "10485763",
"destination": {
"name": "EventHubArchive.AzureBlockBlob",
"storage_account_resource_id": "/subscriptions/" + SUBSCRIPTION_ID +
"/resourceGroups/" + RESOURCE_GROUP_NAME + "/providers/Microsoft.Storage/storageAccounts/"
+ STORAGE_ACCOUNT_NAME + "",
"blob_container": "container",
"archive_name_format": "{Namespace}/{EventHub}/{PartitionId}/{Year}
/{Month}/{Day}/{Hour}/{Minute}/{Second}"
}
}
}
)
print("Created EventHub: {}".format(eventhub))

# Create authorization rule
eventhub_rule = eventhub_client.event_hubs.create_or_update_authorization_rule(
resource_group_name=RESOURCE_GROUP_NAME,
namespace_name=EVENTHUB_NAMESPACE,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="AzureFunctionSendPolicy",
parameters={"rights":["SEND"]}
)

# Get authorization rule
eventhub_rule2 = eventhub_client.event_hubs.get_authorization_rule(
resource_group_name=RESOURCE_GROUP_NAME,
namespace_name=EVENTHUB_NAMESPACE,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="AzureFunctionSendPolicy"
)
print("get_authorization_rule() for manager for EventHub: {}\n".format(eventhub_rule2))

# List keys
namespace_keys = eventhub_client.event_hubs.list_keys(
resource_group_name=RESOURCE_GROUP_NAME,
namespace_name=EVENTHUB_NAMESPACE,
event_hub_name=EVENTHUB_NAME,
authorization_rule_name="AzureFunctionSendPolicy"
)
print("list_keys() for EventHub: {}\n".format(namespace_keys))
print("namespace_keys.primary_key:",namespace_keys.primary_key)

Tuesday, December 14, 2021

Pagerduty - event API - trigger/resolve incident

class PagerDutyAlert(object):
def __init__(self):
super(PagerDutyAlert, self).__init__()

def _trigger_incident(self, alert):
self._alert = alert

self._source = self._alert['Source']
self._service = self._alert['Service']
self._workflow_name = self._alert['WorkflowName']
self._workflow_status = self._alert['WorkflowStatus']
self._workflow_level = self._alert['Level']
self._workflow_env = self._alert['Environment']
self._team = self._alert['Team']
self._summary = self._alert['Summary']
self._severity = self._alert['Severity']

"""Triggers an incident via the V2 REST API."""
self.routing_key = xxxxxxxx
self.FROM = xxxxe-mailxxxxx
headers = {
'Content-Type': 'application/json',
'Accept': 'application/vnd.pagerduty+json;version=2',
'From': self.FROM
}

url = 'https://events.pagerduty.com/v2/enqueue'
custom_details = {"Source": self._source,
"Service": self._service,
"WorkflowName": self._workflow_name,
"Summary": self._summary,
"Level": self._workflow_level,
"Environment": self._workflow_env,
"Team": self._team,
"Time": str(int(time.time()))
}
obj = {
'routing_key': self.routing_key,
"event_action": "trigger",
"payload": {
"summary": "Workflow: " + self._workflow_name + " Failed!",
"custom_details": custom_details,
"severity": self._severity,
"source": self._source
}
}
r = requests.post(url, headers=headers, data=json.dumps(obj))

print('Status Code: {code}'.format(code=r.status_code))
print(r.json())

def _resolve_incident(self, dedup_key):
header = {
"Content-Type": "application/json"
}

payload = {
"routing_key": self.routing_key,
"event_action": "resolve",
"dedup_key": dedup_key
}

response = requests.post('https://events.pagerduty.com/v2/enqueue',
data=json.dumps(payload),
headers=header)

if response.json()["status"] == "success":
print('Incident Resolved ')
else:
print(response.text) # print error message if not successful

def _run(self, alert):

if alert:
self._trigger_incident(alert)
else:
raise ValueError("invalid json_params!")

def run_job(self, string_params):
print("string_params", string_params)
if type(string_params) is dict:
conf = string_params
else:
conf = json.loads(string_params)

self._run(**conf)


if __name__ == '__main__':
fire.Fire(PagerDutyAlert())