Wednesday, July 31, 2024
Tableau - trigger starting extract job
Tuesday, October 17, 2023
Python - get folder size from s3 bucket
import boto3
Wednesday, September 27, 2023
Python - Finding Correlation Between Many Variables (Multidimensional Dataset) with Python
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
data = pd.read_csv('/Users/Downloads/test.csv', index_col=0)
corr = data.corr()
fig = plt.figure()
ax = fig.add_subplot(111)
cax = ax.matshow(corr, cmap='coolwarm', vmin=-1, vmax=1)
fig.colorbar(cax)
ticks = np.arange(0, len(data.columns), 1)
ax.set_xticks(ticks)
plt.xticks(rotation=90)
ax.set_yticks(ticks)
ax.set_xticklabels(data.columns)
ax.set_yticklabels(data.columns)
plt.show()
Wednesday, June 28, 2023
Azure - functions to get schema from schema registry using API
import requests
Databricks - some useful functions
Function to drop duplicate columns when join different dataframes
Function to remove nested list
Function to dynamic add columns with null value
Monday, April 10, 2023
Python - Create CredFile to encode secrets locally
from cryptography.fernet import Fernet
import re
import ctypes
import time
import os
import sys
class Credentials():
def __init__(self):
self.__username = ""
self.__key = ""
self.__password = ""
self.__key_file = 'key.key'
self.__time_of_exp = -1
# ----------------------------------------
# Getter setter for attributes
# ----------------------------------------
@property
def username(self):
return self.__username
@username.setter
def username(self, username):
while (username == ''):
username = input('Enter a proper User name, blank is not accepted:')
self.__username = username
@property
def password(self):
return self.__password
@password.setter
def password(self, password):
self.__key = Fernet.generate_key()
f = Fernet(self.__key)
self.__password = f.encrypt(password.encode()).decode()
del f
@property
def expiry_time(self):
return self.__time_of_exp
@expiry_time.setter
def expiry_time(self, exp_time):
if (exp_time >= 2):
self.__time_of_exp = exp_time
def create_cred(self):
"""
This function is responsible for encrypting the password and create key file for
storing the key and create a credential file with user name and password
"""
cred_filename = 'CredFile.ini'
with open(cred_filename, 'w') as file_in:
file_in.write("#Credential file:\nUsername={}\nPassword={}\nExpiry={}\n"
.format(self.__username, self.__password, self.__time_of_exp))
file_in.write("++" * 20)
# If there exists an older key file, This will remove it.
if (os.path.exists(self.__key_file)):
os.remove(self.__key_file)
# Open the Key.key file and place the key in it.
# The key file is hidden.
try:
os_type = sys.platform
if (os_type == 'linux'):
self.__key_file = '.' + self.__key_file
with open(self.__key_file, 'w') as key_in:
key_in.write(self.__key.decode())
# Hidding the key file.
# The below code snippet finds out which current os the script is running on and does the task base on it.
if (os_type == 'win32'):
ctypes.windll.kernel32.SetFileAttributesW(self.__key_file, 2)
else:
pass
except PermissionError:
os.remove(self.__key_file)
print("A Permission error occurred.\n Please re run the script")
sys.exit()
self.__username = ""
self.__password = ""
self.__key = ""
self.__key_file
def main():
# Creating an object for Credentials class
creds = Credentials()
# Accepting credentials
creds.username = input("Enter UserName:")
creds.password = input("Enter Password:")
print("Enter the epiry time for key file in minutes, [default:Will never expire]")
creds.expiry_time = int(input("Enter time:") or '-1')
# calling the Credit
creds.create_cred()
print("**" * 20)
print("Cred file created successfully at {}"
.format(time.ctime()))
if not (creds.expiry_time == -1):
os.startfile('expire.py')
print("**" * 20)
if __name__ == "__main__":
main()
Wednesday, December 7, 2022
Python - Paramiko- How to SSH and transfer files with python
You can transfer files from the remote machine to the local or vice versa using SFTP (Secure File Transfer Protocol) and SCP(Secure Copy Protocol).
https://medium.com/@keagileageek/paramiko-how-to-ssh-and-file-transfers-with-python-75766179de73
Tuesday, June 14, 2022
Python - super()
The Python super() method lets you access methods from a parent class from within a child class. This helps reduce repetition in your code.
One core feature of object-oriented programming languages like Python is inheritance. Inheritance is when a new class uses code from another class to create the new class.
When you’re inheriting classes, you may want to gain access to methods from a parent class. That’s where the super() function comes in.
Here is the syntax for the super() method:
class Food():
def __init__(self, name):
self.name = name
class Cheese(Food):
def __init__(self, brand):
super().__init__()
self.brand = brandThe super() method does not accept any arguments.
Whatever comes last in Third() is the parent class. so when calling Third(), it will call Second() first, then First().
Python - Upload file to S3 using pre-signed URL
import requests
# API endpoint
api_url = "xxxxxx"
reports = "test/"
file_name = "test.csv"
OBJECT_NAME_TO_UPLOAD = "test.csv"
params = {"action": "upload",
"file_key": reports + file_name,
"bucket_name": "xxxx"
}
headers = {"x-api-key": "xxxx"}
session = requests.session()
# Generate pre-signed-url
response = session.get(api_url, headers=headers, json=params)
res = response.json()
print(res)
# # # # # Upload file to S3 using pre-signed URL
with open(OBJECT_NAME_TO_UPLOAD) as f:
upload_response = session.request('PUT', res['url'], data=f.read().encode('utf-8'))
print(f"Upload response: {upload_response.status_code}")
Thursday, March 10, 2022
Python - upload file to S3 using boto3
Get S3 Session
import boto3
def _get_s3_session():
envvars = subprocess.check_output(
['aws-vault', 'exec', 'your-aws-role', '--', 'env'])
aws_access_key_id = ''
aws_secret_access_key = ''
aws_session_token = ''
for envline in envvars.split(b'\n'):
line = envline.decode('utf8')
eqpos = line.find('=')
if eqpos < 4:
continue
k = line[0:eqpos]
v = line[eqpos + 1:]
if k == 'AWS_ACCESS_KEY_ID':
aws_access_key_id = v
if k == 'AWS_SECRET_ACCESS_KEY':
aws_secret_access_key = v
if k == 'AWS_SESSION_TOKEN':
aws_session_token = v
session = boto3.Session(aws_access_key_id, aws_secret_access_key, aws_session_token)
return session
Upload files to S3
class ProgressPercentage(object):
def __init__(self, filename, size=None, prefix_str=''):
self._filename = filename
if size is None:
self._size = float(os.path.getsize(filename))
else:
self._size = size
self._prefix_str = prefix_str
self._seen_so_far = 0
self._lock = threading.Lock()
def __call__(self, bytes_amount):
# To simplify we'll assume this is hooked up
# to a single filename.
with self._lock:
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
sys.stdout.write(
"\r%s %s / %s (%.2f%%)" % (
self._prefix_str,
self._seen_so_far,
self._size,
percentage))
sys.stdout.flush()
def _upload_to_s3(self, **kwargs):
start = time.time()
self._object_name = kwargs['object_name']
self._file_path = kwargs['path']
self._bucket_name = kwargs['bucket_name']
# If S3 object_name was not specified, use file_name
if self._object_name is None:
self._object_name = ntpath.basename(self._file_path )
# Upload the file
session = _get_s3_session()
s3_client = session.client("s3")
try:
# Perform the transfer
s3_client.upload_file(
self._file_path,
self._bucket_name,
self._object_name,
Callback=ProgressPercentage(
self._file_path,
prefix_str='Uploading file {:} -> {:}.{:}: '.format(
os.path.dirname(self._file_path), self._bucket_name, self._object_name)),
)
Python - Best practice for Python main function definition and program start/exit
exit()
The exit() will straightforwardly end the execution of the Python code/script. The code that follows thereafter the exit() command won’t be executed.
You can think of exit() as an alias for quit() (or vice-versa) in Python. They simply co-exist to make Python more user-friendly. However, it is not a good practice to use quit() and exit() in production code and should only be reserved for use in the interpreter.
sys.exit(main())
This will call the function main() and when main finishes, it will exit giving the system the return code that is the result of main().
A simplified example where this might be used:
def main():
try:
doSomething()
return 0
except:
return 1
if __name__ == "__main__":
sys.exit(main())
exit() vs. sys.exit()
import sys
def foo():
try:
print("Demonstrating sys.exit()")
sys.exit()
print("This line will not be executed!")
except SystemExit:
# argument denoting exit status
print("Abnormal Termination! Encountered SystemExit")
def func():
print("Demonstrating exit()")
print(exit)
exit() # lines after this statement are ignored and are not executed
print("The previous command executes ignoring all the lines after the command")
# Calling both the function:
foo()
print()
func()
Result:
Demonstrating sys.exit()
Abnormal Termination! Encountered SystemExit
Demonstrating exit()
Use exit() or Ctrl-D (i.e. EOF) to exit
When you need something more fancy working with sys.exit() explicitly is a good idea. That’s basically your future approach. But don’t only think of scripts called directly, also think of installed packages. Setuptools has a cross-platform mechanism to define functions as entry points for scripts. If you have this in your setup.py:
from setuptools import setup
setup(
name='scraper',
version='1.0.0',
install_requires=[
'boto3>=1.17.17',
'keyring>=22.3.0',
'bs4',
'requests_ntlm'
],
entry_points={
'console_scripts': [
'scraper=scraper.__main__:main'
]
},
)
and install that package you can run your_script from the command line.
pip install -e ./scraper -c conf/conf_upload.json Template of code structure would be:
import sys
from argparse import ArgumentParser, Namespace
from typing import Dict, List
import yaml # just used as an example here for loading more configs, optional
def parse_arguments(cli_args: List[str] = None) -> Namespace:
parser = ArgumentParser()
# parser.add_argument()
# ...
return parser.parse_args(args=cli_args) # None defaults to sys.argv[1:]
def load_configs(args: Namespace) -> Dict:
try:
with open(args.config_path, 'r') as file_pointer:
config = yaml.safe_load(file_pointer)
# arrange and check configs here
return config
except Exception as err:
# log errors
print(err)
if err == "Really Bad":
raise err
# potentionally return some sane fallback defaults if desired/reasonable
sane_defaults = []
return sane_defaults
def main(args: Namespace = parse_arguments()) -> int:
try:
# maybe load some additional config files here or in a function called here
# e.g. args contains a path to a config folder; or use sane defaults
# if the config files are missing(if that is your desired behavior)
config = load_configs(args)
do_real_work(args, config)
except KeyboardInterrupt:
print("Aborted manually.", file=sys.stderr)
return 1
except Exception as err:
# (in real code the `except` would probably be less broad)
# Turn exceptions into appropriate logs and/or console output.
# log err
print("An unhandled exception crashed the application!", err)
# non-zero return code to signal error
# Can of course be more fine grained than this general
# "something went wrong" code.
return 1
return 0 # success
# __main__ support is still here to make this file executable without
# installing the package first.
if __name__ == "__main__":
sys.exit(main(parse_arguments()))
Another example,
import sys
import importlib
import argparse
import json
def main():
conf_default_path = '{:}/conf/conf_upload.json'.format(os.path.abspath(__file__ + "/../../"))
parser = argparse.ArgumentParser(description='Transform files from/to an AWS-S3 object.')
parser.add_argument("-v", "--verbose",
help="increase output verbosity",
action="store_true")
parser.add_argument("-c", "--conf-path",
help="path to pipeline json config file",
default=conf_default_path,
type=str, )
args = parser.parse_args()
if not os.path.exists(args.conf_path):
raise IOError('Pipeline JSON config file {:} does not exist!'.format(args.conf_path))
with open(args.conf_path) as f:
conf = json.load(f)
for stage in conf['pipeline']:
# import module
module = importlib.import_module('scraper.' + stage['module'])
# run module
module.run(**stage['params'])
if __name__ == '__main__':
sys.exit(main())
Wednesday, March 2, 2022
Azure Function - Schema Registry
import os
Azure Function - Manage Event Hub
import os
Tuesday, December 14, 2021
Pagerduty - event API - trigger/resolve incident
class PagerDutyAlert(object):
def __init__(self):
super(PagerDutyAlert, self).__init__()
def _trigger_incident(self, alert):
self._alert = alert
self._source = self._alert['Source']
self._service = self._alert['Service']
self._workflow_name = self._alert['WorkflowName']
self._workflow_status = self._alert['WorkflowStatus']
self._workflow_level = self._alert['Level']
self._workflow_env = self._alert['Environment']
self._team = self._alert['Team']
self._summary = self._alert['Summary']
self._severity = self._alert['Severity']
"""Triggers an incident via the V2 REST API."""
self.routing_key = xxxxxxxx
self.FROM = xxxxe-mailxxxxx
headers = {
'Content-Type': 'application/json',
'Accept': 'application/vnd.pagerduty+json;version=2',
'From': self.FROM
}
url = 'https://events.pagerduty.com/v2/enqueue'
custom_details = {"Source": self._source,
"Service": self._service,
"WorkflowName": self._workflow_name,
"Summary": self._summary,
"Level": self._workflow_level,
"Environment": self._workflow_env,
"Team": self._team,
"Time": str(int(time.time()))
}
obj = {
'routing_key': self.routing_key,
"event_action": "trigger",
"payload": {
"summary": "Workflow: " + self._workflow_name + " Failed!",
"custom_details": custom_details,
"severity": self._severity,
"source": self._source
}
}
r = requests.post(url, headers=headers, data=json.dumps(obj))
print('Status Code: {code}'.format(code=r.status_code))
print(r.json())
def _resolve_incident(self, dedup_key):
header = {
"Content-Type": "application/json"
}
payload = {
"routing_key": self.routing_key,
"event_action": "resolve",
"dedup_key": dedup_key
}
response = requests.post('https://events.pagerduty.com/v2/enqueue',
data=json.dumps(payload),
headers=header)
if response.json()["status"] == "success":
print('Incident Resolved ')
else:
print(response.text) # print error message if not successful
def _run(self, alert):
if alert:
self._trigger_incident(alert)
else:
raise ValueError("invalid json_params!")
def run_job(self, string_params):
print("string_params", string_params)
if type(string_params) is dict:
conf = string_params
else:
conf = json.loads(string_params)
self._run(**conf)
if __name__ == '__main__':
fire.Fire(PagerDutyAlert())

