AWS — Orchestration of ETL Jobs
As we venture into the realm of data transformation, ETL jobs stand as architects, meticulously shaping unrefined data into meaningful structures. Enters AWS — the vast cloud platform where this digital design unfolds.
This article serves as your guide to comprehending the dynamic and seamless integration between ETL jobs and AWS, illustrating how they seamlessly collaborate to redefine the landscape of data management. We will navigate through the technical details of this collaboration, shedding light on how businesses can leverage the streamlined association between ETL jobs and AWS to optimize their data strategies.
Let’s first try to understand the overall workflow of the architecture using a flow chart and then we will take an in-depth look at each of the steps.
The initial scheduling and automation of the jobs in this orchestration are achieved via EventBridge rules. Each job has an EventBridge rule defined for it that contains information in its input transformer, like the data source location for the ETL job, the name of the state machine that needs to be triggered for the job, subscribers to get the job status notification, job parameters to be passed to the ETL jobs, and other information.
// INPUT PATH
{
"timestamp": "$.time"
}
// INPUT TEMPLATE
{
"machineName": "state_machine_1",
"key1": "value1",
"key2": "value2",
"emailRecepients": {
"job1":["abc@example.com","xyz@example.com"]
},
"jobName": [
"job1"
],
"jobSources": {
"job1": "1##s3://bucket1/path1"
},
"arguments":{
"trx_date":{
"type":"timestamp",
"input": <timestamp>,
"definition": {
"inputFormat": "%Y-%m-%dT%H:%M:%SZ",
"outputFormat": "%Y%m%d"
}
},
"process_time":{
"type":"timestamp",
"input": <timestamp>,
"definition": {
"inputFormat": "%Y-%m-%dT%H:%M:%SZ",
"outputFormat": "%Y-%m-%dT%H:%M:%SZ"
}
}
}
}
The target of each EventBridge rule is an AWS Lambda function discussed in the following section.
This dynamic AWS Lambda function reads the input from the rule and processes it as per the requirements. It then triggers the Step Functions state machine for the respective ETL job with all the required input parameters. Here is what the lambda looks like:
from datetime import datetime
import logging, boto3, json, dateutil.tz
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def datetimeEval(inputValue, inputFormat, outputFormat):
ist_timezone = dateutil.tz.gettz('Asia/Kolkata')
outputValue = datetime.strptime(inputValue, inputFormat)
outputValue = outputValue.astimezone(ist_timezone)
outputValue = outputValue.strftime(outputFormat)
return outputValue
def getStateMachinesFunction():
client = boto3.client('stepfunctions')
state_machines = client.list_state_machines(maxResults=1000)
return state_machines['stateMachines']
def lambda_handler(event, context):
logger.info("Full event: %s", event)
arguments = event["arguments"].keys()
new_subscribers = event['emailRecepients']
stateMachineName = event["machineName"]
job_name = event['jobName']
job_sources = event['jobSources']
# rest of the parameters from eventbridge
state_machine_input = {}
for argument in arguments:
if (event["arguments"][argument]["type"]=="timestamp"):
state_machine_input[argument] = datetimeEval(event["arguments"][argument]['input'], event["arguments"][argument]['definition']['inputFormat'], event["arguments"][argument]['definition']['outputFormat'])
elif (event["arguments"][argument]["type"] in ["integer", "string", "bool"]):
state_machine_input[argument] = event["arguments"][argument]["input"]
state_machine_input['recepients'] = new_subscribers
state_machine_input['job_name'] = job_name
state_machine_input['job_sources'] = job_sources
logger.info("State machine input: %s", state_machine_input)
ist_timezone = dateutil.tz.gettz('Asia/Jakarta')
ist_time_now = datetime.now(ist_timezone)
processDate = ist_time_now.strftime("%Y%m%d")
processTime = ist_time_now.strftime('%H%M%S%f')[:-3]
stateMachines = getStateMachinesFunction()
print(stateMachines)
arnDict = {}
for sm in stateMachines:
arnDict[sm['name']] = sm['stateMachineArn']
stateMachineArn = arnDict[stateMachineName]
stfn_client = boto3.client("stepfunctions")
response = stfn_client.start_execution(
stateMachineArn = stateMachineArn,
name = f"executing-{stateMachineName}-{processDate}-{processTime}",
input = json.dumps(state_machine_input)
)
Next comes the state machine execution:
- Each state machine first runs a precheck about whether the source data for the ETL job is available in the S3 or not. This is achieved with the help of yet another dynamic lambda function. If the source data is unavailable, a notification mentioning the same is sent to the users, otherwise, the glue job is triggered. Here is what the precheck lambda looks like:
import json
from datetime import datetime, timedelta
import boto3, logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def parse_s3_path(path):
path = path.replace("s3://", "")
parts = path.split("/", 1)
bucket_name = parts[0]
if len(parts) > 1:
key_prefix = parts[1]
else:
key_prefix = ""
return bucket_name, key_prefix
def get_check_date(trx_date, workflow_period):
if workflow_period == 'NULL':
return 'NULL'
elif workflow_period == 'CRAWL':
check_date = datetime.strptime(trx_date, "%Y%m%d")
check_date = check_date.replace(day=1)
check_date = check_date.strftime('%Y-%m-%d')
return check_date
else:
check_date = datetime.strptime(trx_date, "%Y%m%d")
check_date -= timedelta(days=int(workflow_period))
# return check_date.strftime("%Y%m%d")
return check_date.strftime("%Y-%m-%d")
def check_data_exists(s3_client, path):
bucket_name, key_prefix = parse_s3_path(path)
logger.info("checking for data at s3://%s/%s", bucket_name, key_prefix)
response = s3_client.list_objects_v2(
Bucket=bucket_name,
Prefix=key_prefix
)
if response.get('KeyCount', 0) > 0:
return True
else:
return False
def lambda_handler(event, context):
if 'body' not in event:
event['body'] = event
logger.info("Full event: %s", event)
trx_date = event['body']['trx_date']
job_name_list = event['body']['job_name']
job_sources = event['body']['job_sources']
recepients = event['body']['recepients']
# rest of the parameters
s3_client = boto3.client('s3')
job_name = job_name_list[0]
print(f"\n----- job_name: {job_name} -----")
if job_name in job_sources:
sources = job_sources[job_name]
path_info = sources['pathInfo']
paths = [i.split("##")[1] for i in path_info]
workflow_periods = [i.split("##")[0] for i in path_info]
path_status_mapping = {}
status_code_list = []
status_code = '200'
message = f"DATA FOUND FOR {job_name}."
for path, workflow_period in zip(paths, workflow_periods):
print(f"path: {path}")
print(f"workflow_period: {workflow_period}")
check_date = get_check_date(trx_date, workflow_period)
if check_date == 'NULL':
path = path
else:
path = f"{path}{check_date}"
if check_data_exists(s3_client, path) == False:
status_code_list.append('404')
path_status_mapping[path] = False
message = f"DATA NOT FOUND FOR {job_name}!"
print(f"{message}\n")
# break
elif check_data_exists(s3_client, path) == True:
status_code_list.append('200')
path_status_mapping[path] = True
message = f"DATA FOUND FOR {job_name}."
print(f"{message}\n")
if '404' in status_code_list:
status_code = '404'
else:
status_code = '200'
response = {}
job_name_list.pop(0)
response['job_name'] = job_name_list
response['status_code'] = status_code
response['path_status_mapping'] = path_status_mapping
response['message'] = message
response['trx_date'] = trx_date
response['recepients'] = recepients
response['job_sources'] = job_sources
return {
'statusCode': 200,
'body': json.dumps(response)
}
- This glue job is responsible for performing the ETL on the source data. Once the glue job finishes with any status, the next state in the state machine is the dynamic lambda function to create job status notifications.
- This dynamic lambda function is designed to create alerts to job-specific subscribers subscribed to an SNS topic. In our case, there are multiple subscribers on a topic, but all subscribers are not supposed to receive notifications for all the jobs. There are subscribers assigned to each job name, and only those subscribers of the topic must receive the job status notification. To meet this requirement, the notification lambda is designed such that it first updates the subscription on the SNS topic based on the information present in the input transformer of the EventBridge rule and then sends the alerts to the specific subscriber. For more detail, we have added one article on this
https://medium.com/@skuad-engineering/send-message-to-specific-subscription-of-topic-in-aws-sns-8af98ce733f0
import dateutil.tz, boto3, json, re, logging, datetime
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def convert_epoch_to_jkt(epoch_time):
utc_time = datetime.datetime.fromtimestamp(epoch_time/1000.0)
print(f"utc: {utc_time}")
jkt_timezone = tz.gettz('Asia/Jakarta')
jkt_time = utc_time.astimezone(jkt_timezone)
return jkt_time
def add_job_info_for_new_sns_endpoint(subscriptions,subscription_dict,sns,job_name):
for email_address in subscriptions:
if email_address in subscription_dict.keys():
print(f"Email address {email_address} already exists in subscription list")
subscription_arn = subscription_dict[email_address]
sns_response = sns.get_subscription_attributes(
SubscriptionArn=subscription_arn
)
# Adding the job_name in SNS filter policy to the user who was not exist in previous run of jobs
filter_policy = sns_response.get('Attributes', {}).get('FilterPolicy')
jobs_in_filter_policy = json.loads(filter_policy).get('job_name')
if job_name not in jobs_in_filter_policy:
jobs_in_filter_policy.append(job_name)
new_filter_policy = {}
sns.set_subscription_attributes(
SubscriptionArn=subscription_arn,
AttributeName='FilterPolicy',
AttributeValue=json.dumps({})
)
new_filter_policy['job_name'] = jobs_in_filter_policy
logger.info(f"Filter policy for subscription {subscription_arn}: {new_filter_policy}")
sns.set_subscription_attributes(
SubscriptionArn=subscription_arn,
AttributeName='FilterPolicy',
AttributeValue=json.dumps(new_filter_policy)
)
def remove_job_info_from_sns_endpoint(subscriptions,subscription_dict,sns, job_name):
# Difference of All subscription and subscribers (those who need to get alerts)
subscriptionToEdit = [i for i in list(subscription_dict.keys()) if i not in subscriptions]
for subscription in subscriptionToEdit:
subscription_arn = subscription_dict[subscription]
sns_response = sns.get_subscription_attributes(
SubscriptionArn=subscription_arn
)
filter_policy = sns_response.get('Attributes', {}).get('FilterPolicy')
jobs_in_filter_policy = json.loads(filter_policy).get('job_name')
if job_name in jobs_in_filter_policy:
jobs_in_filter_policy.remove(job_name)
new_filter_policy = {}
sns.set_subscription_attributes(
SubscriptionArn=subscription_arn,
AttributeName='FilterPolicy',
AttributeValue=json.dumps({})
)
new_filter_policy['job_name'] = jobs_in_filter_policy
logger.info(f"Filter policy for subscription {subscription_arn}: {new_filter_policy}")
sns.set_subscription_attributes(
SubscriptionArn=subscription_arn,
AttributeName='FilterPolicy',
AttributeValue=json.dumps(new_filter_policy)
)
def add_filter_policy_in_sns_endpoint(subscription_dict,sns):
# adding filter policy for user whose does has not yet
for subscription in subscription_dict.keys():
subscription_arn = subscription_dict[subscription]
sns_response = sns.get_subscription_attributes(
SubscriptionArn=subscription_arn
)
attributes = sns_response.get('Attributes', None)
if attributes is not None:
filter_policy = attributes.get('FilterPolicy',None)
if filter_policy is None:
sns.set_subscription_attributes(
SubscriptionArn=subscription_arn,
AttributeName='FilterPolicy',
AttributeValue=json.dumps({"job_name":[""]})
)
else:
sns.set_subscription_attributes(
SubscriptionArn=subscription_arn,
AttributeName='FilterPolicy',
AttributeValue=json.dumps({"job_name":[""]})
)
def get_topic_arn(sns,topic_name="aws_glue_job_notification"):
sns_response = sns.list_topics()
token = sns_response.get('NextToken')
topics = sns_response.get('Topics')
for sns_topic in topics:
arn = sns_topic.get('TopicArn')
if topic_name in arn:
return arn
def lambda_handler(event, context):
logger.info("Full event: %s", event)
trx_date = event['body']['trx_date']
recepients_details = event['body']['recepients']
# rest of the parameters
jobFailed = False
serviceError = False
# job_name = re.search(r"\b\w*_agj\w*\b", event['body']['message']).group()
subject = ""
if event['body']['status_code'] == '404':
path_status_mapping = event['body']['path_status_mapping']
sources_to_be_sent_in_mail = [k for k,v in path_status_mapping.items() if v == False]
job_name = re.search(r"\b\w*_agj\w*\b", event['body']['message']).group()
sources = event['body']['job_sources'][job_name]
source_list = "\n".join('\t' + source for source in sources_to_be_sent_in_mail)
subject = f"[INFO][{job_name}]{trx_date}"
message = f"""Hi team \nData is not present in the bucket for {job_name} job. \nPlease check the job sources and rerun the job. \nJob Sources: \n{source_list} \nThanks \nTeam 360CA"""
message_attributes = {}
message_attributes['job_name'] = {
'DataType': 'String',
"StringValue": job_name
}
elif event['body']['status_code'] == '200' and 'error' in event:
jobFailed = True
try:
json.loads(event['error']['Cause'])
except:
serviceError = True
# Job does not exist or role has no permission to run glue job
# any scenario before execution of glue job
if serviceError == True:
causeString = event['error']['Cause']
job_name = re.search(r"\b\w*_agj_\w*\b", causeString).group()
message = event['error']['Cause']
subject = f"[ERR][{job_name}] {trx_date}"
else:
cause_json = json.loads(event['error']['Cause'])
job_name = cause_json['JobName']
job_run_id = cause_json['Id']
started_on = cause_json['StartedOn']
error_message = cause_json['ErrorMessage']
subject = f"[ERR][{job_name}] {trx_date}"
message = f"""Hi,\n\nThe Glue job has failed, please help to check on below link \n \nJob Name - {job_name}\nError Message - {error_message} \nLink for Job Run - https://ap-south-1.console.aws.amazon.com/gluestudio/home?region=ap-south-1#/job/{job_name}/run/{job_run_id}"""
elif event['body']['status_code'] == '200' and 'error' not in event:
job_name = event['output']['JobName']
job_run_id = event['output']['Id']
started_on = event['output']['StartedOn']
subject = f"[SUCCESS][{job_name}] {trx_date}"
message = f"""Hi, \n\nThe Glue job has completed successfully.\n\nJob Name - {job_name}\nJob Run ID - {job_run_id} \nLink for Job Run - https://ap-south-1.console.aws.amazon.com/gluestudio/home?region=ap-south-1#/job/{job_name}/run/{job_run_id}"""
sns = boto3.client('sns')
# Getting subscription endpoint for whom the alert need to be send
subscriptions = recepients_details[job_name]
topic_arn = get_topic_arn(sns,"aws_glue_job_notification")
response = sns.list_subscriptions_by_topic(
TopicArn=topic_arn
)
# Getting all subscriber of the SNS topic and mapping the ARN value to endpoint(email) of subscription
subscription_dict = {}
for subscription in response['Subscriptions']:
if subscription['SubscriptionArn']!="PendingConfirmation":
subscription_dict[subscription['Endpoint']]=subscription['SubscriptionArn']
add_filter_policy_in_sns_endpoint(subscription_dict, sns)
remove_job_info_from_sns_endpoint(subscriptions, subscription_dict, sns,job_name)
add_job_info_for_new_sns_endpoint(subscriptions, subscription_dict, sns,job_name)
message_attributes = {}
message_attributes['job_name'] = {
'DataType': 'String',
"StringValue": job_name
}
sns.publish(
TopicArn=topic_arn,
Subject=subject,
Message=message,
MessageAttributes=message_attributes
)
- If all the states in the state machine are executed without any failure, the state machine execution is marked successful. But there might be a case when the source data for the ETL job is missing in the S3 bucket and a data unavailability notification has been sent to the subscribers, OR in another scenario, the glue job might fail, and the subscribers will receive a notification about the same and the status of state machine execution will be marked as SUCCEEDED. But in these cases, the state machine execution is supposed to be marked as FAILED. Hence, if the lambda function responsible for creating the “data not available” notification is triggered or the glue job completes with a “FAILED” state, then an explicit FAIL state is added as the next state to mark the state machine execution as FAILED. Here is the definition of the state machine created.
{
"Comment": "Run glue job",
"StartAt": "Pass",
"States": {
"Pass": {
"Type": "Pass",
"Next": "preCheckLambda",
"ResultPath": null
},
"preCheckLambda": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-southeast-3:463641729692:function:job_data_precheck"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "JobSourcePresentOrNot",
"ResultSelector": {
"body.$": "States.StringToJson($.Payload.body)"
}
},
"JobSourcePresentOrNot": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.body.status_code",
"StringEquals": "404",
"Next": "sourceNotFoundNotification"
},
{
"Variable": "$.body.status_code",
"StringEquals": "200",
"Next": "executeJobNotebook"
}
]
},
"sourceNotFoundNotification": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-southeast-3:463641729692:function:job_trigger_sns2",
"Payload.$": "$",
"InvocationType": "RequestResponse"
},
"ResultPath": null,
"Next": "Fail"
},
"Fail": {
"Type": "Fail"
},
"executeJobNotebook": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "dt_code_glue",
"Arguments": {
"--trx_date.$": "$.body.trx_date"
}
},
"Next": "JobSuccessNotification",
"Catch": [
{
"ErrorEquals": [
"States.TaskFailed"
],
"Next": "JobFailureNotification",
"ResultPath": "$.error"
}
],
"ResultPath": "$.output"
},
"JobSuccessNotification": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-southeast-3:463641729692:function:job_trigger_sns2"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true
},
"JobFailureNotification": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-southeast-3:463641729692:function:job_trigger_sns2",
"Payload.$": "$",
"InvocationType": "RequestResponse"
},
"ResultPath": null,
"Next": "Fail"
}
}
}
Developer Contribution — Kunal Gupta, Aamir Khan, and Shashank Tiwari