Apache Beam-ETL via Google Cloud DataFlow From BigQuery To BigTable

Skuad Engineering
5 min readJan 19, 2023

--

BigQuery and Bigtable

BigQuery and Bigtable are two powerful data storage and processing services provided by Google Cloud. BigQuery is a fully-managed, serverless data warehouse that allows users to store and query large amounts of data using SQL. Bigtable, on the other hand, is a fully managed, NoSQL, wide-column database service that allows users to store and access large amounts of data in a highly-scalable and performant manner.

In many cases, it may be necessary to move data from BigQuery to Bigtable for further processing or analysis. One way to do this is by using Google Cloud Dataflow, a fully-managed service for creating data pipelines that can move data between storage systems and perform transformations on that data.

Perquisite

1) Google Cloud BigQuery must have a table whose data need to be migrated and schema should have date related column in it to fetch the data on date basis.

2) Google Cloud BigTable instance must be created with table name and column family name inside the table.

3) Google Cloud DataFlow must have permission to read and write to BigQuery and BigTable

Steps to Migrate the data

Here are the high-level steps to move data from BigQuery to Bigtable using Dataflow:

  1. Create a Dataflow job. This can be done through the Google Cloud Dataflow UI in the Google Cloud Console or through the Cloud Dataflow API. In our case, we have build the code using Apache Beam Python SDK. Apache Beam is Unified Model which can help to do batch and streaming data processing. Write once, run anywhere data processing for mission-critical production workloads.

2. In the Google Cloud Dataflow job, create a BigQuery as input source to read the data from BigQuery table. This can be done by specifying the BigQuery project, dataset, and table to read the data from it.

import apache_beam as beam
data = p | 'Read from BigQuery' >> beam.io.ReadFromBigQuery(query='SELECT * FROM my_dataset.my_table'))

3. Perform any necessary transformations on the data. This can include things like filtering, aggregating, or converting the data to a different format.

# Perform transformations on data
# Example: Filter rows with a specific value in a column
data = data | 'Filter rows' >> beam.Filter(lambda row: row['column_name'] == 'specific_value')

4. To be able to write the data into Bigtable, we need to transform the data into a list of DirectRow objects.

# Transform data into a list of Bigtable DirectRow objects
transformed_data = data | 'Convert to DirectRow' >> beam.Map(lambda row: bigtable.row.DirectRow(row))

5. Create a Bigtable output sink to write the data to Bigtable. This can be done by specifying the Bigtable instance and table to write the data.

from apache_beam.io.gcp.bigtableio import WriteToBigTable
# Write data to Bigtable 
transformed_data | 'Write to Bigtable' >> WriteToBigTable( project_id='my-project', instance_id='my-bigtable-instance', table_id='my-table')

6. Run the Dataflow job to move the data from BigQuery to Bigtable.

export GOOGLE_APPLICATION_CREDENTIALS=<Pass the PATH to the service Account which has all access to Google Services>

python <PYTHON BEAM FILENAME>.py \
--runner DataflowRunner \
--service_account_email="<Service Account Email>" \
--project <GCP Project on which Dataflow will spawn> \
--temp_location <GCS Temporary Location> \
--staging_location <GCS Staging Location> \
--region <GCP REGION> \
--subnetwork <GCP SUBNETWORK>
--machine_type <GCP Machine Type> \
--disk_size_gb <Dataflow Worker Disk Size> \
--max_num_workers <Maximum Number of Worker to Run> \
--num_workers <Initial Number of Worker to Spawn>

A complete example is given below for further clearance:

import argparse
import decimal
import json
import logging
from datetime import datetime, date
import apache_beam as beam
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud.bigtable import row
from pandas._libs.tslibs import timestamps

argv = None

parser = argparse.ArgumentParser()

parser.add_argument(
'--repeated_columns',
required=True,
dest='repeated_columns',
default="{}",
help="Dictionary of repeated columns: col_family name"
)

parser.add_argument(
'--col_family_names',
required=True,
dest='col_family_names',
default="{}",
help="Dictionary of repeated columns: col_family name"
)

parser.add_argument(
'--bq_project_id',
required=True,
dest='bq_project_id',
default="[]",
help="List of repeated columns"
)

parser.add_argument(
'--bq_schema',
required=True,
dest='bq_schema',
default="",
help="path to GCS data"
)

parser.add_argument(
'--bq_table',
required=True,
dest='bq_table',
default="",
help="extension of GCS data : json or parquet"
)

parser.add_argument(
'--bq_date_id',
required=True,
dest='bq_date_id',
default="",
help="extension of GCS data : json or parquet"
)

parser.add_argument(
'--bt_project_id',
required=True,
dest='bt_project_id',
default="",
help="extension of GCS data : json or parquet"
)

parser.add_argument(
'--bt_table_id',
required=True,
dest='bt_table_id',
default="",
help="extension of GCS data : json or parquet"
)

parser.add_argument(
'--bt_instance_id',
required=True,
dest='bt_instance_id',
default="",
help="extension of GCS data : json or parquet"
)

known_args, pipeline_args = parser.parse_known_args(argv)

class RowTransformationToBigtable(beam.DoFn):

def __init__(self, col_family_mapping, repeated_columns):
self.col_family_mapping = col_family_mapping
self.repeated_columns = repeated_columns

def process(self, record):
# logging.info("Element is {0}".format(record))
direct_row = row.DirectRow(row_key="some_key")
timestamp = datetime.now()

def conversion_of_value(value):
if type(value) == date or type(value) == timestamps.Timestamp:
value = value.isoformat()
if type(value) == float or type(value) == decimal.Decimal:
value = str(value)
if type(value) == datetime:
value = value.strftime("%Y-%m-%d")
return value

for key, value in record.items():
value = conversion_of_value(value)
if type(value) == list and key in self.repeated_columns: # Repeated
for repeated_dict in value:
new_timestamp = datetime.now()
for repeated_key, repeated_value in repeated_dict.items():
repeated_value = conversion_of_value(repeated_value)
if repeated_value is not None and type(repeated_value) is not list:
direct_row.set_cell(
self.col_family_mapping[key],
repeated_key,
repeated_value,
timestamp=new_timestamp)

if value is not None and type(value) is not list:
direct_row.set_cell(
self.col_family_mapping['base'],
key,
value,
timestamp=timestamp)

yield direct_row
# yield record

def run(save_main_session=True):
"""
Executive Function
"""
col_family_names = json.loads(known_args.col_family_names)
repeated_columns = json.loads(known_args.repeated_columns)
pipeline_options = PipelineOptions(pipeline_args)
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
pipeline_options = PipelineOptions(pipeline_args)

with beam.Pipeline(options=pipeline_options) as p:
bq_data = p | "Read From Bigquery" >> beam.io.ReadFromBigQuery(
query='SELECT * FROM `{0}.{1}.{2}`'.format(known_args.bq_project_id,
known_args.bq_schema,
known_args.bq_table),
use_standard_sql=True)
list_of_directRow = bq_data | "Row Transformation for Bigtable" >> beam.ParDo(
RowTransformationToBigtable(col_family_names, repeated_columns))
result = list_of_directRow | "Write to Bigtable" >> WriteToBigTable(project_id=known_args.bt_project_id,
instance_id=known_args.bt_instance_id,
table_id=known_args.bt_table_id)

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

To convert the data into a list of DirectRow objects, we have used the function RowTransformationToBigtable; in which we have converted repeated columns to the required format, according to the column families already created in the Bigtable.

It’s worth noting that you need to have authentication set up for the Dataflow job to have the permissions to read from BigQuery and write to Bigtable. You can set up authentication using a service account key or by using Application Default Credentials (ADC), We are using Google Authentication using google_cloud_options.

In conclusion, this pipeline shows how to move data from BigQuery to Bigtable using Google Cloud Dataflow whose code is written using Apache Beam framework.

Developer Contribution — Nitesh Kumar Singh, Kunal Gupta and Aamir Khan

--

--

No responses yet