how to manage workflow of EMR using AWS Step Function?

Hardik Patel
3 min readJan 22, 2023

--

TL;DR

There is workflow called as state machine in AWS Step Function.

Step Functions is based on state machines and tasks. A state machine is a workflow. A task is a state in a workflow that represents a single unit of work that another AWS service performs. Each step in a workflow is a state.

Prerequisite

  • aws account
  • basic understanding of aws step function state machine

we are going to configure state machine as per above snapshot. Now click on first step , we need to pass configuration details as parameters to create EMR cluster,

update API parameters value as below,

{
"Name": "AI Cluster",
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
"ReleaseLabel": "emr-6.9.0",
"AutoScalingRole": "EMR_AutoScaling_DefaultRole",
"ScaleDownBehavior": "TERMINATE_AT_TASK_COMPLETION",
"EbsRootVolumeSize": 10,
"AutoTerminationPolicy": {
"IdleTimeout": 3600
},
"Applications": [
{
"Name": "Hadoop"
},
{
"Name": "Spark"
}
],
"LogUri": "s3n://aws-logs-xxxxxx-ap-south-1",
"VisibleToAllUsers": "true",
"Instances": {
"KeepJobFlowAliveWhenNoSteps": true,
"InstanceGroups": [
{
"InstanceCount": 1,
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"SizeInGB": 32,
"VolumeType": "gp2"
},
"VolumesPerInstance": 2
}
]
},
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"Name": "Master - 1"
},
{
"InstanceCount": 2,
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"SizeInGB": 32,
"VolumeType": "gp2"
},
"VolumesPerInstance": 2
}
]
},
"InstanceRole": "CORE",
"InstanceType": "m5.xlarge",
"Name": "Core - 2"
}
]
}
}

Next,

update API parameters as below,

{
"ClusterId.$": "$.ClusterId",
"Step": {
"Name": "logic processor",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"--conf",
"spark.yarn.appMasterEnv.ENVIRON=PROD",
"--conf",
"spark.yarn.appMasterEnv.SRC_DIR=s3://xxxx/xxx/xxx",
"--conf",
"spark.yarn.appMasterEnv.SRC_FILE_FORMAT=json",
"--conf",
"spark.yarn.appMasterEnv.TGT_DIR=s3://xxx/xxxx/xxxx",
"--conf",
"spark.yarn.appMasterEnv.TGT_FILE_FORMAT=parquet",
"--conf",
"spark.yarn.appMasterEnv.SRC_FILE_PATTERN=2022-01-13",
"--py-files",
"s3://xxxxxxx/app/xxxx.zip",
"s3://xxxxx/app/app.py"
]
}
}
}

now click on output tab and update as below. Here we need to modify output due to get the EMR Cluster ID and pass to the next step,

Now 3rd step,

update api parameters as below,

{
"ClusterId.$": "$.ClusterId"
}

After successful execution of state machine,

events generated after starting state machine,

Hence we have seen basic workflow how to create cluster and run spark job and after that how to remove EMR clusters.

Step Function integration with EMR is available in all region. So we can build complex workflow for jobs to execute on EMR cluster.

If you found this guide helpful then do click on 👏 the button and also feel free to drop a comment.

Follow for more stories like this.

--

--

No responses yet