Practice of creating a data analysis platform with BigQuery and Cloud DataFlow (preparation)
Last time I wrote that I would use Scio instead of Java and Python. First, I tried using Java and Python for practice. I didn't go to Scio this time.
What i want to do ** Put processed data into BigQuery from BigQuery that contains raw data ** is.
The environment is Python 2.7.13 + Apache Beam 2.5.0 Java 1.8.0_25 + Apache Beam 2.4.0
When I was googled, I had a lot of trouble because a lot of Cloud Dataflow 1 series articles were caught. Even the official document is quite stuck in the 1st system. I think it was best to read Apache Beam with all my might.
Also, the official GCP documentation has different versions in Japanese and English, and Japanese is often old. I think you can be happy with the following Chrome extensions. GCP outdated docs checker
Official documentation First of all, you can understand the flow by doing the sample wordcount of the official document.
From the raw tweet data that I put in BigQuery last time It decomposes the user information and outputs the user ID, user name, and screen name. The following is the executable program.
The tweet.SiroTalk table is the input and the tweet.SiroTalkPython3 table is the output.
parseuser.py
from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import json
from datetime import datetime
def parse_user(element):
tweet_id = element['tweet_id']
ct = element['ct']
full_text = element['full_text']
user = element['user']
user_json = json.loads(user)
user_id = user_json['id']
user_screen_name = user_json['screen_name']
user_name = user_json['name']
create_time = datetime.fromtimestamp(ct).strftime('%Y-%m-%d %H:%M:%S')
return {
'tweet_id': tweet_id,
'create_time': create_time,
'full_text': full_text,
'user_id': user_id,
'user_screen_name': user_screen_name,
'user_name': user_name
}
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
# '--runner=DataflowRunner',
'--runner=DirectRunner',
'--project=<project-id>',
'--staging_location=<bucket_path>/staging',
'--temp_location=<bucket_path>/temp',
'--job_name=<job_name>',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
query = 'SELECT tweet_id, ct, full_text, user FROM tweet.SiroTalk'
(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='<project_name>', use_standard_sql=False, query=query))
| 'modify' >> beam.Map(parse_user)
| 'write' >> beam.io.Write(beam.io.BigQuerySink(
'tweet.SiroTalkPython',
schema='tweet_id:INTEGER, create_time:DATETIME, full_text:STRING, user_id:INTEGER, user_screen_name:STRING, user_name:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Run
$ python parseuser.py
Actually, it is more general to execute with parameters, but this time I hard-coded it for practice.
The data is processed by parse_user (element): element contains one row of the table. Configure the shape you want to output here and return it.
For python, I couldn't find a way to set the partitioned table in the BigQuery output. If you can't do this, the way you spend money when executing queries will change considerably, which is a problem. Also, only python2 is supported.
Table after processing
Java is just as easy to do from the sample. In particular, Java cannot be created with just one file like python. It may be easier to start based on the sample first-dataflow.
I modified the sample WordCount.java.
ParseUser.java
package com.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.bigquery.model.*;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.json.JSONObject;
public class ParseUser {
public interface ParseUserPipelineOptions extends GcpOptions {
// @Description("BigQuery dataset name")
// @Default.String("tweet")
// String getDataset();
// void setDataset(String dataset);
//
// @Description("BigQuery input table name")
// @Default.String("SiroTalkTest2")
// String getInputTable();
// void setInputTable(String table);
@Description("BigQuery table schema file")
@Default.String("schema.json")
String getSchemaFile();
void setSchemaFile(String schemaFile);
// @Description("BigQuery output table name")
// @Default.String("SiroTalkJava")
// String getOutputTable();
// void setOutputTable(String outputTable);
}
public static class ParseUserFn extends DoFn<TableRow, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
TableRow tweet = c.element();
Long tweetId = Long.parseLong(tweet.get("tweet_id").toString());
Long ct = Long.parseLong(tweet.get("ct").toString());
String fullText = tweet.get("full_text").toString();
String user = tweet.get("user").toString();
Instant instant = Instant.ofEpochSecond(ct);
String tweetDateString = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.ofInstant(instant, ZoneId.of("Asia/Tokyo")));
JSONObject userJson = new JSONObject(user);
Long userId = Long.parseLong(userJson.get("id").toString());
String screenName = userJson.get("screen_name").toString();
String name = userJson.get("name").toString();
TableRow outputRow = new TableRow()
.set("tweet_id", tweetId)
.set("ct", ct)
.set("create_time", tweetDateString)
.set("full_text", fullText)
.set("user_id", userId)
.set("user_screen_name", screenName)
.set("user_name", name);
c.output(outputRow);
}
}
public static void main(String[] args) throws IOException {
final ParseUserPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ParseUserPipelineOptions.class);
final String tableSchemaJson = new String(Files.readAllBytes(Paths.get(options.getSchemaFile())), Charset.forName("UTF-8"));
final TableSchema tableSchema = new TableSchema().setFields(new ObjectMapper().reader().forType(new TypeReference<List<TableFieldSchema>>() {}).readValue(tableSchemaJson));
final Pipeline p = Pipeline.create(options);
PCollection<TableRow> inputRows = p.apply("ReadFromBQ", BigQueryIO.readTableRows()
.fromQuery("SELECT tweet_id, ct, full_text, user FROM [<project_name>:tweet.SiroTalk]"));
PCollection<TableRow> outputRows = inputRows.apply(ParDo.of(new ParseUserFn()));
outputRows.apply("WriteToBQ", BigQueryIO.writeTableRows()
.to("<project_name>:tweet.SiroTalkJava")
.withSchema(tableSchema)
.withTimePartitioning(new TimePartitioning().setField("ct"))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_EMPTY));
p.run().waitUntilFinish();
}
}
schema.json
[
{
"name": "tweet_id",
"type": "INTEGER"
},
{
"name": "ct",
"type": "TIMESTAMP"
},
{
"name": "create_time",
"type": "DATETIME"
},
{
"name": "full_text",
"type": "STRING"
},
{
"name": "user_id",
"type": "INTEGER"
},
{
"name": "user_screen_name",
"type": "STRING"
},
{
"name": "user_name",
"type": "STRING"
}
]
Run
mvn compile exec:java \
-Dexec.mainClass=com.example.ParseUser \
-Dexec.args="--project=<project-id> \
--stagingLocation=<bucket_path>/staging/ \
--runner=DataflowRunner"
The schema of the output table is written in schema.json. It is placed directly under the directory to be executed. Again, if you use ParseUserPipelineOptions properly, you can use the arguments at runtime, but for the time being, hard-coding.
I'm pulling a query similar to python and processing it with ParseUserFn. The way of writing here is unique and I almost imitate it.
Table after processing
BigQuery output partition, but for Java
.Specify using the withTimePartitioning option.
This is because you can only specify TIMESTAMP type or DATE type
This time, I put the ct that I put in Long as TIMESTAMP and made it a partition column.
`` `This is a split table when you put in plain BigQuery. There was a pop called `` ``
It didn't come out with this way of putting.
However, the processing capacity at the time of WHERE ct was reduced properly, so it should be okay ...
Is it the difference between _PARTITIONTIME and column?
Former table
<img width="303" alt="スクリーンショット 2018-08-05 17.43.18.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/ceef5162-f013-a8d4-7048-805f4f3e25b3.png ">
Post-processing table
<img width="222" alt="スクリーンショット 2018-08-05 17.43.24.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/a0ca872d-0e44-9d82-182b-684175475089.png ">
I did it with Apache Beam version 2.4.0, but when I upgraded it to the latest 2.5.0, it stopped working.
I haven't investigated it in detail, but I was told that there is no function.
# 4. Make the processed table visible on the dashboard ...
Next ... maybe I can't do it right away
This time, I tried simple data processing first.
There are other ways to use the pipeline, so there are likely to be many things that can be done.
It was difficult because there was no information ...
I haven't done anything complicated yet, so I can't say which one is better, python or Java.
It was easier to do with python.
So far this time for the time being.
Recommended Posts