Integration of Google Cloud DataProc with Cloud BigQuery

Skuad Engineering
11 min readJan 23, 2023

--

PySpark

PySpark is the Python library for Spark programming. PySpark allows users to interface with the Spark framework using the Python programming language. PySpark provides an API for operating on large datasets in a distributed computing environment. It allows users to perform distributed data processing using the familiar Python programming paradigm, as well as support for RDDs (Resilient Distributed Datasets), which are fundamental data structures in Spark. PySpark also provides support for reading from and writing to a variety of data sources, including HDFS, HIVE, and Hbase.

Google Cloud DataProc

Google Cloud Dataproc is a fully-managed cloud service for running Apache Hadoop and Apache Spark workloads. It allows users to easily create and manage Hadoop and Spark clusters on the Google Cloud Platform (GCP). With Dataproc, users can spin up a cluster in just a few minutes and can use it for a variety of tasks, such as data processing, machine learning, and data analysis.

Dataproc provides several features that make it easy to use with GCP services. For example, it allows data to be stored in Google Cloud Storage, BigQuery, and Cloud SQL, and it also provides integration with other GCP services such as Cloud Dataflow and Cloud ML Engine. Additionally, Dataproc supports the use of custom images, so users can bring their own software and dependencies, and also allows for the use of Jupyter and Hive notebooks for interactive data exploration and analysis.

Overall, Dataproc is a cost-effective and easy-to-use solution for running big data workloads on GCP, allowing users to focus on their data processing and analysis tasks, rather than the underlying infrastructure.

BigQuery

BigQuery is a fully managed, cloud-based data warehouse service provided by Google Cloud. It allows users to store, query, and analyze large amounts of data using SQL-like syntax. BigQuery is designed to handle very large datasets and can process terabytes of data in seconds. It is also fully scalable and can handle real-time data streaming and batch data processing. BigQuery also integrates with other Google Cloud services, such as Google Analytics and Google Cloud Storage, to provide a complete data analysis platform. Additionally, it also provides a web UI, a command-line interface, and APIs for accessing and managing data. It is widely used for Business Intelligence, machine learning, data warehousing and analytics.

PySpark with BigQuery

PySpark is the Python library for Spark programming and it provides an interface for programming Spark with the Python programming language. BigQuery is a fully managed, cloud-native data warehouse that enables super-fast SQL queries using the processing power of Google’s infrastructure.

To use PySpark with BigQuery, you first need to have the Google Cloud SDK and BigQuery command-line tools installed on your machine. Then, you need to create a new Google Cloud project or use an existing one, and enable the BigQuery API. Next, you need to create a service account key for your project and download the JSON file. In your PySpark script, you will set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file you downloaded.

export GOOGLE_APPLICATION_CREDENTIALS=<path-to-service-account.json>

In your PySpark script, you will also need to add the following dependencies:

from pyspark.sql import SparkSession 
from pyspark.sql.functions import *

After that, you will create a SparkSession and set the bigquery configuration options. This includes specifying the project ID, temp GCS bucket, and the version of the spark-bigquery-with-dependencies library you want to use.

Once the SparkSession is created, you can use the spark.read.format method to read data from BigQuery into a DataFrame. You can then perform operations on the DataFrame as you would with any other PySpark DataFrame. And when you are done you can write the data from PySpark DataFrame to BigQuery using df.write.format("bigquery")

With this setup, you can leverage the power of PySpark to perform distributed data processing and the power of BigQuery for fast SQL queries on large datasets. This allows you to easily do things like running complex data analysis or building machine learning models on top of your BigQuery data.

Read Operation with BigQuery with PySpark

Here are the steps for connecting PySpark to BigQuery:

Make sure you have the Google Cloud SDK and BigQuery command-line tools installed on your machine.

Create a new Google Cloud project or use an existing one, and enable the BigQuery API.

Create a service account key for your project and download the JSON file.

In your PySpark script, set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file you downloaded in the previous step.

export GOOGLE_APPLICATION_CREDENTIALS=<path-service-account-json>

Add the following dependencies to your PySpark script:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

Create a SparkSession and set the BigQuery configuration options.

spark = SparkSession.builder \
.appName("PySpark_BigQuery_Interaction") \
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.1") \
.config("spark.bigquery.project", "<your_project_id>") \
.config("spark.bigquery.tempGcsBucket", "<your_temp_gcs_bucket>") \
.getOrCreate()

Use the spark.read.format method to read data from BigQuery into a DataFrame.

df = spark.read.format("bigquery") \
.option("table", "<your_project_id>.<your_dataset>.<your_table>") \
.load()

Perform operations on the DataFrame as you would with any other PySpark DataFrame.

Write Operation with BigQuery with PySpark

Now Let’s move to this write operation using the pyspark in BigQuery tables.To write data from PySpark DataFrame to BigQuery you can use

df.write.format("bigquery") \
.option("table", "<your_project_id>.<your_dataset>.<your_table>") \
.save()

Please keep in mind that you need to replace the placeholders like your_project_id and your_temp_gcs_bucket with your actual project ID and GCS bucket name.

Google Cloud DataProc with BigQuery

Google Cloud Dataproc can be used in conjunction with BigQuery to perform big data processing and analysis tasks. BigQuery is a fully-managed, cloud-native data warehouse that allows users to easily analyze large amounts of data using SQL. When used together, Dataproc and BigQuery provide a powerful solution for performing big data processing and analysis tasks in the cloud.

One common use case for using Dataproc and BigQuery together is to process large datasets stored in BigQuery using Spark. Users can spin up a Dataproc cluster, and use Spark to process the data stored in BigQuery, and then write the results back to BigQuery or other data storage for further analysis.

Another use case is to use BigQuery as a data source for machine learning tasks, with Dataproc providing the distributed computing power to train models on the data stored in BigQuery. Once the model is trained, it can be deployed for online or batch prediction or to run on Cloud ML Engine.

Additionally, Dataproc also provides integration with other GCP services like Cloud Dataflow and Cloud ML Engine, which allow you to perform additional data processing and analysis tasks on the data stored in BigQuery, such as data cleaning, transformation, and feature engineering, as well as machine learning model training and deployment.

Overall, using Dataproc and BigQuery together provides a powerful solution for big data processing and analysis tasks in the cloud, allowing users to easily and efficiently process large datasets stored in BigQuery.

Optimization for PySpark Configuration

There are several techniques that can be used to optimize the performance of PySpark jobs:

  1. Use broadcast variables: Broadcast variables allow you to cache a read-only variable on each worker node, reducing the amount of network traffic and improving performance.
  2. Persist intermediate RDDs: Persist intermediate RDDs in memory or on disk to avoid recomputing them multiple times.
  3. Partition data: Use the repartition() or coalesce() method to adjust the number of partitions and ensure that data is evenly distributed across the cluster.
  4. Use the right serializer: PySpark’s default serializer (Java serialization) can be slow, using a more efficient serializer like Kryo can significantly improve performance.
  5. Use DataFrame and Dataset: DataFrame and Dataset API’s are built on top of RDD and provide a more efficient way of handling structured data, it also allows Spark to optimize the query plan and improve performance.
  6. Set the appropriate level of parallelism: You can use the spark.default.parallelism configuration property to control the level of parallelism for all operations.
  7. Optimize Spark’s memory usage: You can configure the spark.executor.memory and spark.driver.memory properties to ensure that Spark has enough memory to operate efficiently.
  8. Use the right file format: Using a file format that is optimized for read and write performance can improve the performance of your PySpark job. For example, using Parquet instead of CSV can improve the read and write performance.
  9. Optimize the Cluster: Use the right number of worker nodes and configure them with appropriate resources. Also, make sure that the cluster is running the most recent version of Spark and the appropriate version of Hadoop.
  10. Monitor your job performance: Use Spark’s web UI or a tool like Ganglia or Graphite to monitor your job’s performance and identify any bottlenecks.
#sample pyspark config
spark = SparkSession.builder.appName('my-app-config') \
.config('spark.sql.debug.maxToStringFields', 100000) \
.config('spark.sql.shuffle.partitions', '400') \
.config('spark.jars', 'gs://edm-c360-dev-6663-etl-dev-standard/jars/spark-3.1-bigquery-0.27.1-preview.jar') \
.config('spark.sql.inMemoryColumnarStorage.compressed', True) \
.config('spark.sql.autoBroadcastJoinThreshold', -1) \
.config('spark.storage.memoryFraction', '0.6') \
.config('spark.dynamicAllocation.enabled', 'true') \
.config('maxParallelism', 10000) \
.config('preferredMinParallelism', 2700) \
.config('spark.executor.memory', '8g') \
.config('spark.executor.cores', 5) \
.config('spark.executor.instances', 4) \
.config('spark.executor.memoryOverhead', 4000) \
.config('spark.driver.memoryOverhead', '1g') \
.config('spark.driver.memory', '4gb') \
.config("spark.dynamicAllocation.minExecutors", 4) \
.config("spark.dynamicAllocation.maxExecutors", 30) \
.config("spark.dynamicAllocation.initialExecutors", 4) \
.getOrCreate()

Here I’ll explain what is the actual meaning of every config set in the application context

spark.sql.shuffle.partitions :
When an operation that requires shuffling is performed on an RDD, the output is divided into a number of partitions equal to the value of spark.sql.shuffle.partitions. Each partition is processed by a separate task on a separate executor. By increasing the number of partitions, Spark can process the data in parallel and improve the performance of a job.

spark.storage.memoryFraction :
If the job is memory-centric (running out of memory) then reduce the value
Or if the job is CPU-centric (not running out of memory) then increase the value.
Monitor the cluster’s performance again and check if there is an improvement in CPU utilization.
Repeat steps 3–6 until you find the optimal value for spark.storage.memoryFraction
Note → It’s worth noting that the optimal value for spark.storage.memoryFraction will depend on the specific use case, and it may change based on the data size and cluster resources

spark.dynamicAllocation.enabled :
When dynamic allocation is enabled, Spark will use a scheduler algorithm to determine when to add or remove executors based on the job’s resource requirements. The scheduler takes into account the current state of the cluster, the resource usage of the running tasks, and the available cluster resources to decide whether to add or remove executors.

maxParallelism :
When a transformation or action is called on an RDD, Spark creates a new RDD with a number of partitions equal to the value of spark.default.parallelism. Each partition is processed by a separate task on a separate executor. By increasing the number of partitions, Spark can process the data in parallel and improve the performance of a job.
However, having a high number of partitions can lead to a high number of tasks and cause the cluster to run out of resources like memory. Also, if the data size is small it can lead to over-partitioning which can also cause performance issues.

spark.executor.memory :
The spark.executor.memory property sets the amount of memory that is allocated to each executor, in bytes. The value of this property should be a string that includes a unit of memory, such as "512m" for 512 megabytes or "8g" for 8 gigabytes. The default value is "1g", which means that each executor is allocated 1 gigabyte of memory.
It’s important to set the
spark.executor.memory property to an appropriate value to ensure that Spark has enough memory to operate efficiently. If the value is too low, Spark may not be able to store all of the data it needs in memory, which can lead to poor performance. If the value is too high, Spark may consume too much memory and cause other processes on the worker nodes to be starved of memory.

spark.executor.cores :
The spark.executor.cores property sets the number of CPU cores that are allocated to each executor. The value of this property should be an integer, and the default value is 1.

spark.executor.instances :
The spark.executor.instances property sets the number of executors that are created for a job. The value of this property should be an integer. When a job is run, Spark will create this number of executors and assign them to the worker nodes in the cluster.

spark.executor.memoryOverhead :
The spark.executor.memoryOverhead property sets the amount of off-heap memory that is allocated to each executor, in bytes. The value of this property should be a string that includes a unit of memory, such as "512m" for 512 megabytes or "8g" for 8 gigabytes. The default value is 10% of the executor memory or 384 megabytes, whichever is larger.

spark.driver.memoryOverhead :
The spark.driver.memoryOverhead property sets the amount of off-heap memory that is allocated to the driver program, in bytes. The value of this property should be a string that includes a unit of memory, such as "512m" for 512 megabytes or "8g" for 8 gigabytes. The default value is 10% of the driver memory or 384 megabytes, whichever is larger.

spark.driver.memory :
The spark.driver.memory property sets the amount of memory that is allocated to the driver program, in bytes. The value of this property should be a string that includes a unit of memory, such as "512m" for 512 megabytes or "8g" for 8 gigabytes. The default value is "1g" which means that the driver program is allocated 1 gigabyte of memory.

spark.dynamicAllocation.initialExecutors , maxExecutors And minExecutors :
spark.dynamicAllocation.initialExecutors sets the initial number of executors that should be created and available when a Spark application starts. This property should be set to an integer value.
spark.dynamicAllocation.maxExecutors sets the maximum number of executors that can be created by Spark. This property should be set to an integer value. When dynamic allocation is enabled, Spark will not create more than this number of executors.
spark.dynamicAllocation.minExecutors sets the minimum number of executors that should be available in the cluster at any given time. This property should be set to an integer value. When dynamic allocation is enabled, Spark will always try to maintain at least this number of executors running.

Note → It’s recommended to monitor the cluster’s performance using Spark’s web UI or a monitoring tool like Ganglia or Graphite to check the usage of the tune the configuration this is just a basic idea.
For your use case, this configuration may look different

In conclusion, We have covered all the essential configurations of Apache Spark (PySpark), Read / Write from Google BigQuery and Google Dataproc (Google Managed Service for Apache Spark).

In Future, I’ll write another blog specific to BigQuery and PySpark working

Hope this you will like this, Cheers !!!
Thank you 🙏

Developer Contribution → Aamir Khan and Kunal Gupta

--

--

No responses yet