import boto3
import json
import logging
from botocore.exceptions import ClientError
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
def lambda_handler(event, context):
print(event)
prd_bucket = event.get('prd_bucket')
prd_prefix = event.get('prd_prefix')
prd_folder_to_copy = event.get('prd_folder_to_copy')
stg_bucket = event.get('stg_bucket')
stg_prefix = event.get('stg_prefix')
stg_folder_to_copy = prd_folder_to_copy
def list_bucket_objects(bucket_name, prefix_name):
# Retrieve the list of bucket objects
try:
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix_name)
except ClientError as e:
logging.error(e)
return None
return response['Contents']
logging.info(f'delete existing test data from {stg_bucket}...')
stg_objects = list_bucket_objects(stg_bucket, stg_prefix)
if stg_objects:
for obj in stg_objects:
file_key = obj['Key']
if len(stg_folder_to_copy) > 1 and stg_folder_to_copy in file_key:
to_delete = [{'Key': file_key}]
logging.info(f"delete existing test data {obj['Key']}...")
if to_delete:
s3_client.delete_objects(Bucket=stg_bucket, Delete={'Objects': to_delete})
logging.info(f"Copy new test data from {prd_bucket} to {stg_bucket}...")
objects = list_bucket_objects(prd_bucket, prd_prefix)
if objects:
for obj in objects:
logging.info("file_location:", obj['Key'])
if prd_folder_to_copy in obj['Key']:
file_name = obj['Key'].split('/')[-1]
dest = s3.Bucket(stg_bucket)
source = {'Bucket': prd_bucket, 'Key': obj['Key']}
dest.copy(source, stg_prefix + stg_folder_to_copy + file_name)
return {
'statusCode': 200,
'body': json.dumps('Success!')
}
Thursday, January 5, 2023
Lambda - delete and copy files from S3 sub-folders
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment