An introductory summary that I took a quick look at when using Cloud DataFlow at work. I'm not an expert in the data processing field, so I made some mistakes in the description. If you can learn it, please let me know :)
A fully managed service of the stream / batch data processing engine (Apache Beam) provided by Google Cloud Platform.
What is this? Because it feels like If you roughly decompose the elements
Feeling (*) As of 2018/02, stream support is Java only
Using the same GCP service Cloud Pub / Sub as a message bus, By combining with data stores such as CloudIoTCore (MQTT Protocol Bridge), GCS and BigQuery, it is possible to receive, convert and integrate data transmission from terminals, for example.
A rough usage model seen from Google's sample images.
It is difficult to understand unless you actually use it, so this time we will build it using this flow as an example.
Since I wanted to write the cooperation around PubSub, I set the entrance to CloudPubSub, but of course it is possible to process the data of table A and table B in BigQuery and load it to table C.
Download the SDK with the following in pom.xml.
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>[2.2.0, 2.99)</version>
</dependency>
If you have maven or something like that, check Official for more information. Please.
First, set the options of the pipeline to process.
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
//Specify the project name you are using
options.setProject("your gcp porject");
//Create and specify GCS Bucket used by Dataflow for staging
options.setStagingLocation("gs://hoge/staging");
//Create and specify a GCS Bucket that Dataflow temporarily uses
options.setTempLocation("gs://hoge/tmp");
//Specify the runner to execute. Specify DataflowRunner when executing on GCP. DirectRunner for local execution
options.setRunner(DataflowRunner.class);
//Enable streaming
options.setStreaming(true);
//Specify the name at the time of operation(Jobs with the same name cannot run at the same time
options.setJobName("sample");
Official also has an explanation, so only a rough point.
(*) In the above example, I wanted to separate the setting from the code, so I use Java's ResourceBundle, but if I just make it work, it will work with a direct value.
Basically the following for programming Dataflow jobs You will be dealing with the concept.
PipeLine --Object that represents a processing job --Basically, apply the processing flow (input / conversion / output) to PipeLine.
PCollection --Objects that represent data
conversion --Processing part that converts input data to output data
PipeLineI/O --Definition of input or output
Basically, apply the conversion process you wrote to PipeLine and the necessary processing such as PipeLine I / O. You will be building a job.
Three actions described in the above figure
Will be applied to the pipeline.
//Pipeline (job to process))Create object
Pipeline p = Pipeline.create(options);
TableSchema schema = SampleSchemaFactory.create();
//Apply the processing content
//Read data from pubsub subscription
p.apply(PubsubIO.readStrings().fromSubscription("your pubsub subscription"))
//Specify windows every 5 minutes(Not required)
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))))
//Set conversions for input from pubsub(Implementation will be described later)
.apply(ParDo.of(new BigQueryRowConverter()))
//Set up writing to BigQuery
.apply("WriteToBQ", BigQueryIO.writeTableRows()
//Specify the write destination table name
.to(TableDestination("dataset_name:table_name","description"))
//Define the schema of the write destination with Object and pass it
.withSchema(schema)
//Create if there is no table(option)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
//Insert data at the end of the table (optional))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
//Run
p.run();
BigQueryRowConverter
) set aboveIn the case of conversion using ParDo, it inherits DoFn and implements data retrieval ~ conversion to PCollection in processElement which is an abstract method.
package com.mycompany.dataflow_sample.converter;
import com.google.api.services.bigquery.model.TableRow;
import com.google.gson.Gson;
import com.mycompany.dataflow_sample.entity.SampleInputJson;
import org.apache.beam.sdk.transforms.DoFn;
public class BigQueryRowConverter extends DoFn<String,TableRow> {
@ProcessElement
public void processElement(ProcessContext dofn) throws Exception {
//Receive input
String json = dofn.element();
Gson gson = new Gson();
//Convert json to object
SampleInputJson jsonObj = gson.fromJson(json,SampleInputJson.class);
//Convert the contents of json to tableRow of bigquery
TableRow output = new TableRow();
TableRow attributesOutput = new TableRow();
TableRow attr2Output = new TableRow();
//Set data in output
attributesOutput.set("attr1", jsonObj.attributes.attr1);
attributesOutput.set("attr2", jsonObj.attributes.attr2);
attr2Output.set("attr2_prop1",jsonObj.attributes.attr2.prop1);
attr2Output.set("attr2_prop2",jsonObj.attributes.attr2.prop2);
attributesOutput .set("attr2",attr2Output);
output.set("attributes", attributesOutput );
output.set("name", jsonObj.name);
output.set("ts", jsonObj.timeStamp/1000);
//Output
dofn.output(output);
}
}
SampleSchemaFactory.create ()
abovepackage com.mycompany.dataflow_sample.schema;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.List;
public class SampleSchemaFactory {
public static TableSchema create() {
List<TableFieldSchema> fields;
fields = new ArrayList<> ();
fields.add(new TableFieldSchema().setName("name").setType("STRING"));
fields.add(new TableFieldSchema().setName("ts").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("attributes").setType("RECORD")
.setFields(new ArrayList<TableFieldSchema>() {
{
add(new TableFieldSchema().setName("attr1").setType("STRING"));
add(new TableFieldSchema().setName("attr2").setType("RECORD")
.setFields(new ArrayList<TableFieldSchema>() {
{
add(new TableFieldSchema().setName("prop1").setType("INTEGER"));
add(new TableFieldSchema().setName("prop2").setType("STRING"));
}
})
);
}
})
);
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
}
Like this.
Just build and run the Java code you wrote earlier and the job will actually be deployed on GCP.
Send a message from the GCP console.
Operation check is OK if data is inserted in BigQuery :)
You can check the log from the details screen by clicking the created job on the console. Logs are automatically transferred to stackdriver, so you should use that for monitoring and monitoring.
If an Exception occurs while executing DataflowJob, ACK will not be sent to PubSub, so the data will be retrieved again.
Therefore
--PubSub messages are not discarded even if processing is lost in the load part to BigQuery, so they are automatically retried (no need to consider retrying at the communication level). ――On the other hand, if the data is incorrect, it will continue to spit out errors, so it is necessary to perform proper handling such as moving to error data, logging, and discarding by applying validation.
The sample is here. I wanted to partition it, so I made it to create a daily table based on the daily at the time of execution.
Official sample code collection (github) apache beam sdk google official
Recommended Posts