AWS EKS — Installation, Running and Monitoring PySpark Job Using AWS EMR

Skuad Engineering
6 min readDec 5, 2023

--

Amazon EMR (Elastic MapReduce) is a cloud-based big data platform that allows users to process vast amounts of data quickly and cost-effectively. Leveraging the power of Kubernetes, specifically Amazon EKS (Elastic Kubernetes Service), enhances the flexibility and scalability of EMR. In this article, we will explore the installation process of EMR on EKS and delve into running jobs on EKS.

Please note that Amazon does not offer a free tier version of EKS, hence creating an EKS cluster will incur charges for your AWS account.

Installation

Prerequisites: This article requires the following things set up on your machine:

Next, let’s generate an EKS cluster using the AWS CLI. However, before this step, we will create a key pair that will be utilized in the NodeGroup of the EKS cluster.

aws ec2 create-key-pair --region ap-south-1 --key-name <keypair_name>

We can now proceed to build the cluster using AWS CloudFormation.

eksctl create cluster \
--name <cluster_name> \
--region ap-south-1 \
--with-oidc \
--ssh-access \
--ssh-public-key <keypair_name>\
--instance-types=m5.xlarge \
--managed \
--nodes 3 \
--version <Kubernetes latest version number> \
--nodegroup-name demo \
--enable-ssm

Once complete, validate using the below command and also check the CloudFormation UI for stacks completion:

kubectl get nodes -o wide
Image: Validating the cluster creation

Proceeding further, the next step involves establishing the OIDC (OpenID Connect) identity provider for our cluster.

eksctl utils associate-iam-oidc-provider --cluster emr-demp-eks --approve
Image: Creating an IAM OIDC provider for the cluster

Now, we will create a namespace for our cluster.

kubectl create namespace aws-poc-2
Image: Creating a namespace

Once we have these things in place, we will map a service account to the newly created namespace.

eksctl create iamserviceaccount \
--cluster <cluster_name>\
--namespace aws-poc-2 \
--name aws-poc \
--attach-policy-arn <policy> \
--approve

Image: CloudFormation stacks displaying the deployed resources

In the above image, the collection marked as “1” indicates the resources created as part of the cluster creation process, whereas “2” indicates the service account we just created.

Now, we need to attach various permissions based on our requirements to the newly created service role. To do this, follow the following steps:

  1. Go to the EKS cluster UI.
  2. Click on the Resources tab.
  3. Go to resource type authentication.
  4. Click the ServiceAccount.
  5. Filter the namespace “aws-poc-2”.
  6. Click on the name aws-poc (as provided by you in the above command), and copy the value present in the “Annotations” section. It is your IAM Role ARN.

To allow another IAM user to access the cluster:

eksctl create iamidentitymapping --cluster emr-demp-eks --arn <arn of iam user> --group system:masters --username <username>

Configuring and verifying the kubectl context using eksctl

eksctl automates configuring the kubectl context for an EKS cluster. Instead of manually editing the kubeconfig file, users can rely on eksctl to handle the necessary configurations.

Image 5: Configuring the kubectl context using eksctl

Use the following command to configure the kubectl context for your cluster:

aws eks --region <region_code> update-kubeconfig --name <cluster_name>

Then, you can verify the context configuration as shown in the image above.

Setting up the spark operator

The Spark Operator is a Kubernetes custom controller that simplifies the deployment and management of Apache Spark applications. We can now continue installing the Spark Operator on our cluster using Helm.

We first need to authenticate our helm client to the Amazon ECR registry.

aws ecr get-login-password \
--region <region> | helm registry login \
--username <username> \
password-stdin <password>
Image 6: Authenticating helm client to the Amazon ECR registry

Once the login is successful, we can install the Spark Operator as follows:

helm install spark-operator-demo \
oci://235914868574.dkr.ecr.ap-south-1.amazonaws.com/spark-operator \
--set emrContainers.awsRegion=ap-south-1 \
--version 6.15.0 \
--namespace aws-poc-2 \
--set webhook.enable=true
Image 7: Installing the Spark Operator

This documentation from AWS contains information about installing the Spark Operator on the EKS cluster.

Deploying the job

The below YAML configuration file encapsulates the specifications for our Spark job. This file will serve as the blueprint for our Spark application’s deployment. We have specified our Spark and Hadoop configurations here.

# job-phase-2.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: <your-job-name>
namespace: <your-namespace>
spec:
type: Python
mode: cluster
deps:
repositories:
- "https://mvnrepository.com/artifact/"
packages:
- "clojure-interop:com.amazonaws.auth:1.0.0"
hadoopConf:
fs.s3.customAWSCredentialsProvider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
fs.s3.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem
fs.AbstractFileSystem.s3.impl: org.apache.hadoop.fs.s3.EMRFSDelegate
fs.s3.buffer.dir: /mnt/s3
fs.s3.getObject.initialSocketTimeoutMilliseconds: "2000"
mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem: "2"
mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem: "true"
sparkConf:
spark.driver.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/lib/hadoop/hadoop-auth.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*
spark.driver.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
spark.excecutor.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/lib/hadoop/hadoop-auth.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*
spark.executor.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
spark.sql.files.maxPartitionBytes: "19327352832"
spark.sql.shuffle.partitions: "28"
spark.memory.fraction: "0.75"
spark.storage.memoryFraction: "0.25"
spark.sql.debug.maxToStringFields: "500"
spark.sql.parquet.fs.optimized.committer.optimization-enabled: "true"
spark.sql.emr.internal.extensions: com.amazonaws.emr.spark.EmrSparkSessionExtensions
spark.executor.defaultJavaOptions: -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p'
spark.driver.defaultJavaOptions: -XX:OnOutOfMemoryError='kill -9 %p' -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70

image: "235914868574.dkr.ecr.ap-south-1.amazonaws.com/spark/emr-6.14.0:latest"
mainApplicationFile: "s3://<bucket_path>/job.py"

sparkVersion: "3.3.3"
restartPolicy:
type: Never
driver:
env:
- name: TEMP_VARIABLE
value: VARIABLE_VALUE
cores: 1
memory: "512m"
serviceAccount: "emr-containers-sa-spark"
executor:
env:
- name: TEMP_VARIABLE
value: VARIABLE_VALUE
cores: 1
instances: 5
memory: "2g"
serviceAccount: "emr-containers-sa-spark"

Here is a sample Pyspark code:

# job.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc, collect_list

def main():
spark = SparkSession.builder.appName("UserActivityAnalysis").getOrCreate()
try:
input_path = "s3://your-input-data/user_activity_data.csv"
user_activity_data = spark.read.csv(input_path, header=True, inferSchema=True)
transformed_data = user_activity_data.select("user_id", "activity_type", "timestamp")
activity_counts = transformed_data.groupBy("user_id", "activity_type").agg(count("*").alias("activity_count"))
most_common_activity = (
activity_counts
.orderBy(desc("activity_count"))
.groupBy("user_id")
.agg(collect_list("activity_type").getItem(0).alias("most_common_activity"))
)
output_path = "s3://your-output-data/user_activity_analysis_result"
most_common_activity.write.mode("overwrite").parquet(output_path)
print("Spark job completed successfully.")
except Exception as e:
print(f"Error during Spark job execution: {str(e)}")
finally:
spark.stop()

if __name__ == "__main__":
main()

Please note that this Spark code needs to be uploaded to an S3 bucket (path specified at mainApplicationFile in the YAML file)

Now with our YAML and Pyspark files in place, we can run this command:

kubectl apply -f job-phase-2.yaml
Image 8: Submitting the job

Our job will take some time to be available and running because the worker nodes will have to pull the Spark Docker Image.

Once the job gets running, it can be verified using the command as shown in the image below:

kubectl get pods -n <your_namespace>
Image 9: Verifying the submitted job in the running state

Debugging with the help of monitoring/logging:

kubectl logs -r <driver_name>
Image 10: Monitoring the job

That’s it for this article. Once you are done with this demo, make sure that you delete the resources.

You can also connect to Spark UI using Port Forwarding, below is the command to run Spark UI on localhost

kubectl port-forward <spark_driver_pod_name> 4040:4040

Developer Contribution — Kunal Gupta and Aamir Khan

Article Contribution — Shashank Tiwari

--

--

No responses yet