This article is the 23rd day article of Classi Advent Calendar 2019.
Hello, this is @tomoyanamekawa of data AI part of Classi. I usually build a data analysis platform on GCP.
Recently, there was a case that I wanted to divide the data in BigQuery into files according to the values inside and save it in GCS, and at that time I was taken care of by Cloud Dataflow. There seems to be demand for other people, and there were few implementation examples in Python, so I will summarize it.
Execute daily the process of exporting a specific table in BigQuery to Google Cloud Storage (GCS). However, I want to change the save destination directory depending on the value of a certain column. The file format is json.
Reservations table in BigQuery I want to save to GCS separately for each date / shop_id like this.
It is a service that can perform ETL processing without a server provided by GCP. Behind the scenes, Apache Beam is running, so it can be said that it is a service that can use Apache Beam serverlessly. Since parallel processing can be performed, even large-scale data can be processed at high speed.
It supports both stream processing and batch processing, but this time we will use batch processing. For more information, please visit the Official Page.
For those who want to be able to use it for the time being, I think that this procedure in the presentation material of Mr. Yuzu Taso is good (I also caught it with this) It has been uploaded).
Cloud Dataflow uses what is called a "template" to create an ETL process. For general processing, use Templates provided by Google to make it easy on a GUI basis. I can do it. However, I can't do what I want to do this time, so I will create a custom template myself.
By the way, Java or Python can be used as the programming language. This time I will write in Python, but Java has more functions and documentation, so if you or your team members can write Java and there are no maintenance problems, I think Java is better.
Here is the contents of the custom template.
test_template.py
import os
import json
import datetime
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
class JsonSink(fileio.TextSink):
def write(self, record):
self._fh.write(json.dumps(record).encode('utf8'))
self._fh.write('\n'.encode('utf8'))
if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y%m%d')
project_id = 'your_project'
dataset_name = 'your_dataset'
table_name = 'your_table'
bucket_name = 'your_bucket'
#option
pipeline_options = PipelineOptions()
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = f'gs://{bucket_name}/staging'
google_cloud_options.temp_location = f'gs://{bucket_name}/temp'
google_cloud_options.template_location = f'gs://{bucket_name}/templates/test_template'
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
#Creating a pipeline
pipeline = beam.Pipeline(options=pipeline_options)
(pipeline
| 'read' >> beam.io.Read(beam.io.BigQuerySource(
project=project_id,
use_standard_sql=True,
query=f'select * from `{project_id}.{dataset_name}.{table_name}`'
))
| 'write' >> beam.io.fileio.WriteToFiles(
path=f'gs://{bucket_name}/{now}',
destination=lambda record, table_name=table_name: f"shop_id_{record['shop_id']}/",
sink=JsonSink(),
file_naming=beam.io.fileio.destination_prefix_naming()
)
)
pipeline.run()
The point is that we are using this Dynamic Destinations function.
Since the value for each record is stored in the variable called record, you can change the destination (file name of the save destination) for each record with record ['shop_id']
.
Since the created template needs to be placed on GCS, execute this command.
python -m test_template
Then the template will be placed in the location specified by google_cloud_options.template_location
.
You can also set the location of the template at runtime.
Cloud Dataflow itself does not have a scheduler function, so it must be run externally in order to run daily. Therefore, this time, we will enable serverless execution with Cloud Scheduler + Cloud Pub / Sub + Cloud Functions.
Register the following script in Cloud Functions. This script will execute the custom template for you.
from googleapiclient.discovery import build
def main(data, context):
job = 'my_job'
dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().templates().launch(
projectId='your_project',
gcsPath='gs://your_bucket/templates/test_template'
)
response = request.execute()
Cloud Functions triggers are Pub / Sub.
Also, when using Pub / Sub as a trigger, it is necessary to receive two arguments, so it is set as main (data, context)
.
All you have to do is create a Pub / Sub Topic that is the trigger, and publish that Topic daily from Cloud Scheduler.
If you set up Cloud Composer or a server and schedule it with other workflow engines or cron, you can execute a custom template from the gcloud command below.
gcloud dataflow jobs run my_job \
--gcs-location gs://your_bucket/templates/test_template \
--region=asia-northeast1
Cloud Dataflow is very convenient because it would be terrifying to implement a system that can perform such processing on a large scale in a short time. It's a little expensive, so I think it's necessary to choose the usage so that it doesn't cost xx million yen with Cloud Dataflow.
Tomorrow is @ tetsuya0617. looking forward to!
Recommended Posts