Wednesday, August 31, 2022

AWS CloudFormation - cross account


 

ToolsAccount

AWSTemplateFormatVersion: 2010-09-09
Description: >-
bld account role/policy deployment

Parameters:
BldAccountID:
Description: 'Account ID to register Gitlab runner'
Type: String
RoleName:
Description: 'Bld account role name'
Type: String
AllowedValues:
- test
- poc
StgAccountID:
Description: 'Account ID to conduct testing'
Type: String

Resources:
BldAccountRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub '${RoleName}'
Description: allow ec2 and sagemaker services to assume the role # allow stg/prd to assume the role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Principal:
Service:
- sagemaker.amazonaws.com
Action: sts:AssumeRole
- Effect: Allow
Principal:
Service:
- ec2.amazonaws.com
Action: sts:AssumeRole
-
Effect: Allow
Principal:
AWS:
- !Sub 'arn:aws:iam::${StgAccountID}:role/${RoleName}'
Action: sts:AssumeRole
Policies:
- PolicyName: gitlab-runner-and-deployment
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: '*'
Resource: '*'
- Effect: Allow
Action:
- 'ecr:*'
Resource: '*'
- Effect: Allow
Action:
- 'ec2:DescribeKeyPairs'
- 'ec2:TerminateInstances'
- 'ec2:StopInstances'
- 'ec2:StartInstances'
- 'ec2:RunInstances'
- 'ec2:RebootInstances'
- 'ec2:CreateKeyPair'
- 'ec2:DeleteKeyPair'
- 'ec2:ImportKeyPair'
- 'ec2:Describe*'
- 'ec2:CreateTags'
- 'ec2:RequestSpotInstances'
- 'ec2:CancelSpotInstanceRequests'
- 'ec2:DescribeSubnets'
- 'ec2:AssociateIamInstanceProfile'
- 'iam:PassRole'
Resource: "*"
Outputs:
BldAccountRoleArn:
Value: !GetAtt BldAccountRole.Arn

AssumeAccount(stg)

AWSTemplateFormatVersion: 2010-09-09
Description: >-
environment specific account IAM role/policy deployment

Parameters:
BldAccountID:
Description: 'Account ID to register Gitlab runner'
Type: String
RoleName:
Description: 'Bld account role name'
Type: String
Default: test
AllowedValues:
- test
- poc
Environment:
Description: 'Current environment'
Type: String
AllowedValues:
- stg
- prd

Resources:
CrossAccountAssumeRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Ref RoleName
Description: allow bld role to assume the role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Principal:
AWS:
- !Sub 'arn:aws:iam::${BldAccountID}:role/Admin'
- !Sub 'arn:aws:iam::${BldAccountID}:role/Staff'
- !Sub 'arn:aws:iam::${BldAccountID}:role/${RoleName}'
Action: sts:AssumeRole
Policies:
- PolicyName: !Sub ${Environment}-policy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: '*'
Resource: '*'
- Effect: Allow
Action:
- 'cloudformation:CreateStack'
- 'cloudformation:UpdateStack'
- 'cloudformation:DeleteStack'
- 'cloudformation:DescribeStacks'
Resource: "*"
- Effect: Allow
Action:
- 's3:CreateBucket'
- 's3:DeleteBucket'
- 's3:DeleteBucket*'
- 's3:PutBucket*'
Resource: "*"

Outputs:
AssumeRoleAccountRoleArn:
Value: !GetAtt CrossAccountAssumeRole.Arn

AWS CloudFormation - check status

 #!/bin/bash


WAIT_TYPE=$1
stack=$2
region=$3
changeset=$4

if WAIT_TYPE='changeset'; then
echo "Check $WAIT_TYPE Status..."

stackStatus=($(aws cloudformation describe-change-set --region $region --stack-name $stack --change-set-name $changeset --query Status --output text))

until
[ "$stackStatus" = "CREATE_COMPLETE" ] ||
[ "$stackStatus" = "CREATE_FAILED" ] ||
[ "$stackStatus" = "DELETE_COMPLETE" ] ||
[ "$stackStatus" = "DELETE_FAILED" ] ||
[ "$stackStatus" = "ROLLBACK_COMPLETE" ] ||
[ "$stackStatus" = "ROLLBACK_FAILED" ] ||
[ "$stackStatus" = "UPDATE_COMPLETE" ] ||
[ "$stackStatus" = "UPDATE_ROLLBACK_COMPLETE" ] ||
[ "$stackStatus" = "UPDATE_ROLLBACK_FAILED" ]

echo $stackStatus
do
ChangeSetId=($(aws cloudformation describe-change-set --region $region --stack-name $stack --change-set-name $changeset --query ChangeSetId --output text))
echo "ChangeSet $stackStatus: $ChangeSetId ..."
done
fi

if WAIT_TYPE='stack'; then
echo "Check $WAIT_TYPE Status..."

stackStatus=($(aws cloudformation describe-stacks --region $region --stack-name $stack --query Stacks[0].StackStatus --output text))

until
[ "$stackStatus" = "CREATE_COMPLETE" ] ||
[ "$stackStatus" = "CREATE_FAILED" ] ||
[ "$stackStatus" = "DELETE_COMPLETE" ] ||
[ "$stackStatus" = "DELETE_FAILED" ] ||
[ "$stackStatus" = "ROLLBACK_COMPLETE" ] ||
[ "$stackStatus" = "ROLLBACK_FAILED" ] ||
[ "$stackStatus" = "UPDATE_COMPLETE" ] ||
[ "$stackStatus" = "UPDATE_ROLLBACK_COMPLETE" ] ||
[ "$stackStatus" = "UPDATE_ROLLBACK_FAILED" ]

echo $stackStatus
do

eventId=($(aws cloudformation describe-stack-events --region $region --stack $stack --query StackEvents[0].PhysicalResourceId --output text))
if [ "$eventId" != "$lastEventId" ]; then
echo "Deploying/updating: $eventId"
lastEventId=$eventId
fi
sleep 3
stackStatus=($(aws cloudformation describe-stacks --region $region --stack-name $stack --query Stacks[0].StackStatus --output text))
echo "Stack $stackStatus: $eventId ..."
done

echo "Stack Status: $stackStatus"
if [ "$stackStatus" != "CREATE_COMPLETE" ] && [ "$stackStatus" != "UPDATE_COMPLETE" ] && [ "$stackStatus" != "DELETE_COMPLETE" ] && [ ! -z "$stackStatus" ]; then
exit 1
fi
fi

Spark - SCD2 Type Update

class UpsertSCDSpark(BaseLoader):
def __init__(
self,
primary_keys: Union[str, list, tuple],
path: Optional[str] = None,
dbtable: Optional[str] = None,
tgt_dbtable: Optional[str] = None,
update_timestamp: Optional[str] = None,
src_eff_timestamp: Optional[str] = None,
current_flag: Optional[str] = "current_flag",
soft_delete_flag: Optional[str] = "soft_delete_flag",
delete_timestamp: Optional[str] = "delete_ts",
eff_ts: Optional[str] = "eff_ts",
exp_ts: Optional[str] = "exp_ts",
columns: Optional[list] = None,
format: str = 'delta',
*args,
**kwargs
):

super(UpsertSCDSpark, self).__init__(*args, **kwargs)

self.primary_keys = primary_keys
self.tgt_dbtable = tgt_dbtable
self.update_timestamp = update_timestamp
self.src_eff_timestamp = src_eff_timestamp
self.current_flag = current_flag
self.soft_delete_flag = soft_delete_flag
self.delete_timestamp = delete_timestamp
self.eff_ts = eff_ts
self.exp_ts = exp_ts
self.dbtable = dbtable
self.columns_list = columns
self.path = path
self.format = format
self._loader = ReadSpark

if self.dbtable and self.path:
raise ValueError('One of `dbtable` or `path` must be specified!')

if self.dbtable and self.path:
raise ValueError('No `dbtable` or `path` specified. One must be specified!')

self.database, self.table = self._parse_dbtable(self.dbtable)

self.logger = logging.getLogger(__name__)
self.logger.setLevel(self.log_level)

def _get_df(self):
params = {
'path': self.path,
'dbtable': self.dbtable,
'format': self.format,
'options': self.options,
'spark_session': self._spark_session,
}

read = ReadSpark(**params)
df = read.load()

return df

def _get_cols(self, tbl_a_alias, tbl_b_alias, keys, columns):
exclude_key_list = [x for x in columns if x not in keys][:-1]

a_list = [tbl_a_alias + '.' + col.strip() for col in exclude_key_list]
b_list = [tbl_b_alias + '.' + col.strip() for col in exclude_key_list]
col_conditions = " or ".join(a + "<>" + b for a, b in zip(a_list, b_list))

return col_conditions

def _get_scd_sql_contents(self, tbl_a_alias, tbl_b_alias, keys, tbl_src_alias="src", tbl_tgt_alias="tgt"):
# get columns
mergeKey_cols = ", ".join(tbl_a_alias + '.' + col + ' as mergeKey_' + col for col in keys)
NULL_mergeKey_cols = ", ".join('NULL as mergeKey_' + col for col in keys)

# get inner join conditions
a_list = [tbl_a_alias + '.' + col.strip() for col in keys]
b_list = [tbl_b_alias + '.' + col.strip() for col in keys]
key_join_conditions = " and ".join(a + "=" + b for a, b in zip(a_list, b_list))

# get outer join conditions
src_list = [tbl_src_alias + '.mergeKey_' + col.strip() for col in keys]
tgt_list = [tbl_tgt_alias + '.' + col.strip() for col in keys]
merge_join_conditions = " and ".join(a + "=" + b for a, b in zip(src_list, tgt_list))

return mergeKey_cols, NULL_mergeKey_cols, key_join_conditions, merge_join_conditions

def _merge_tables(self):

mergeKey_cols, NULL_mergeKey_cols, key_join_conditions, merge_join_conditions = self._get_scd_sql_contents("a",
"b",
self.primary_keys)
df_src = self._get_df()
if self.columns_list:
lower_cols = [col.lower() for col in self.columns_list]
else:
lower_cols = [col.lower() for col in df_src.columns]
lower_keys = [col.lower() for col in self.primary_keys]
src_tbl_alias, tgt_tbl_alias = 'src', 'tgt'
inner_col_conditions = self._get_cols("a", "b", lower_keys, lower_cols)
outer_col_conditions = self._get_cols(src_tbl_alias, tgt_tbl_alias, lower_keys, lower_cols)

insert_columns = ",".join(df_src.columns)
insert_values = ",".join([src_tbl_alias + '.' + col for col in df_src.columns])

sql_merge_query = f"""
MERGE INTO {self.tgt_dbtable} AS tgt
USING (select {mergeKey_cols}, a.*
from {self.dbtable} a
UNION ALL
SELECT {NULL_mergeKey_cols}, a.*
FROM {self.dbtable} a JOIN {self.tgt_dbtable} b ON {key_join_conditions}
WHERE b.{self.current_flag} = 'Y' AND {inner_col_conditions}
) AS src ON {merge_join_conditions}
WHEN MATCHED and tgt.{self.current_flag} = 'Y' and {outer_col_conditions} THEN
UPDATE SET tgt.{self.current_flag} = 'N', tgt.{self.update_timestamp} = current_timestamp(), tgt.{self.exp_ts} = {self.src_eff_timestamp}
WHEN NOT MATCHED THEN
INSERT ({insert_columns}, {self.current_flag}, {self.soft_delete_flag}, {self.delete_timestamp}, {self.eff_ts}, {self.exp_ts})
VALUES ({insert_values}, 'Y', 'N',current_timestamp(), {self.src_eff_timestamp}, current_timestamp())
"""

self._spark_session.sql(sql_merge_query)

def load(self, df: Optional[SparkDataFrame] = None) -> SparkDataFrame:

self.logger.debug('{:}.{:} started...'.format(self.__class__.__name__, inspect.stack()[0][3]))

df_src = self._get_df()
if df_src.head():

self._merge_tables()
df = self._spark_session.table(f"{self.tgt_dbtable}")

else:
warnings.warn(
"Upsert skipped! The input dataframe is empty."
)

self.logger.debug('{:}.{:} completed.'.format(self.__class__.__name__, inspect.stack()[0][3]))

return df