Reference : https://alexwlchan.net/2017/07/listing-s3-keys/
from glob import glob import boto3 class Versions: def __init__(self): """Gets the latest version from local or s3"""
pass
def get_latest_version_from_local(path): """Gets the latest version from Local"""
versions_paths = glob((path + "/*"), recursive=True) versions = [] for i in enumerate(versions_paths): split_path = i[1].rstrip('/').split("/") version = split_path.pop() versions.append(version) versions.sort(reverse=True) return versions[0]
def get_latest_version_from_s3(bucket_name, path): """Gets the latest version from s3 """
key = path.rstrip('/').split("/").pop() s3 = boto3.client('s3') response = s3.list_objects_v2( Bucket=bucket_name,
Prefix=key,
MaxKeys=100) versions = [] for obj in response['Contents']: split_path = obj['Key'].rstrip('/').split("/") versions.append(split_path[1]) versions.sort(reverse=True) return versions[0]
def get_all_s3_keys(bucket_name): """Get a list of all keys in an S3 bucket."""
versions = [] kwargs = {'Bucket': bucket_name} s3 = boto3.client('s3') while True: resp = s3.list_objects_v2(**kwargs) for obj in resp['Contents']: if 'well_production' in str(obj['Key'])
and '$folder$' not in str(obj['Key']): versions.append(obj['Key']) try: kwargs['ContinuationToken'] = resp['NextContinuationToken'] except KeyError: break
versions.sort(reverse=True) return versions[0]