[JAVA] Sort BigQuery tables according to data in Dataflow

With Google Cloud Dataflow, you can output data to multiple locations by branching the pipeline. However, with this method, the output destination must be decided in advance, and it is not possible to do something like "output to any BigQuery table specified in the data" (probably). I was looking for a way to achieve such ** dynamic output distribution **, and as a general purpose DynamicDestinations I found that I should use the class .0 / org / apache / beam / sdk / io / gcp / bigquery / DynamicDestinations.html).

In this article, I will try table sorting in two ways, the method using this DynamicDestinations and the simplified method.

Sample configuration

Since I am only interested in output this time, I will "save the text obtained from Pub / Sub as it is in BigQuery" without any extra data processing. However, the pipeline is branched and two types of output processing are added so that it can be compared with the output to a normal single table.

dd-pipeline.png

Specific examples of data stored in BigQuery are as follows.

$ table="<project_id>:<dataset_id>.<table_id>"
$ bq head -n 3 "${table}"
+-----------+
|   text    |
+-----------+
| Charizard |
| Alakazam  |
| Nidoqueen |
+-----------+
$ bq head -n 3 "${table}_C"
+------------+
|    text    |
+------------+
| Cloyster   |
| Clefairy   |
| Charmander |
+------------+

$ {table} contains all the input data, while $ {table} _C contains only the text that starts with C, for example. Each table for distribution is a name generated in the code, and only the ones that are needed are created at runtime.

code

src/main/java/com/example/dataflow/DynamicDestinationsPipeline.java


package com.example.dataflow;

import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;

/**
 * An example of BigQuery dynamic destinations.
 *
 * <p>To run this example using managed resource in Google Cloud
 * Platform, you should specify the following command-line options:
 *   --project=<YOUR_PROJECT_ID>
 *   --jobName=<JOB_NAME>
 *   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
 *   --tempLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
 *   --runner=DataflowRunner
 *   --input=<PUBSUB_INPUT_TOPIC>
 *   --output=<BIGQUERY_OUTPUT_TABLE>
 */
public class DynamicDestinationsPipeline {
  /**
   * Add commandline options: input (Pub/Sub topic ID) and output (BigQuery table ID).
   */
  public interface MyOptions extends PipelineOptions {
    @Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
    @Validation.Required
    String getInput();
    void setInput(String value);

    @Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>")
    @Validation.Required
    String getOutput();
    void setOutput(String value);
  }

  /**
   * Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
   * Destination table name will be {@code <table_id>_<initial for input string>}.
   */
  static class MyDestinations extends DynamicDestinations<String, String> {
    private final String tablePrefix;

    MyDestinations(String tableId) {
      tablePrefix = tableId + "_";
    }

    /**
     * Returns a destination table specifier: initial for input string.
     */
    @Override
    public String getDestination(ValueInSingleWindow<String> element) {
      return element.getValue().substring(0, 1);
    }

    /**
     * Returns a TableDestination object for the destination.
     * The table name will be {@code <table_id>_<initial for input string>}.
     */
    @Override
    public TableDestination getTable(String destination) {
      return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
    }

    /**
     * Returns a table schema for the destination.
     * The table has only one column: text as STRING.
     */
    @Override
    public TableSchema getSchema(String destination) {
      List<TableFieldSchema> fields = new ArrayList<>();
      fields.add(new TableFieldSchema().setName("text").setType("STRING"));
      return new TableSchema().setFields(fields);
    }
  }

  /**
   * A formatter to convert an input text to a BigQuery table row.
   */
  static class MyFormatFunction implements SerializableFunction<String, TableRow> {
    @Override
    public TableRow apply(String input) {
      return new TableRow().set("text", input);
    }
  }

  /**
   * Run a pipeline.
   * It reads texts from a Pub/Sub topic and writes them to BigQuery tables.
   * The master table saves all the texts, and other tables save the texts with same initials.
   */
  public static void main(String[] args) {
    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("text").setType("STRING"));
    TableSchema schema = new TableSchema().setFields(fields);

    Pipeline p = Pipeline.create(options);

    PCollection<String> texts = p.apply("ReadFromPubSub", PubsubIO.readStrings()
        .fromTopic(options.getInput()));
    texts.apply("WriteToBigQuery", BigQueryIO.<String>write()
        .to(options.getOutput())
        .withSchema(schema)
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
        .to(new MyDestinations(options.getOutput()))
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    p.run();
  }
}

[^ invalid-initial]: Dealing with characters that cannot be used in table names is skipped.


Create a Pub / Sub topic and a BigQuery dataset before running. If you specify stagingLocation etc., you also need a Storage bucket.

You can publish to Pub / Sub from GUI, but I used the following Ruby script to stream the text file.

publish.rb


require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new(
  project_id: "<project_id>",
  credentials: "credential.json"
)
topic = pubsub.topic "<topic_id>"

while (text = gets)
  topic.publish text.chomp
end
$ head -3 list.txt
Bulbasaur
Ivysaur
Venusaur
$ ruby publish.rb < list.txt

Where it gets stuck

At first I also passed the schema to the MyDestinations class, but when I ran it I got a ʻIllegalArgumentException. After searching, I found [Same error](https://stackoverflow.com/questions/46165895/dataflow-dynamicdestinations-unable-to-serialize-org-apache-beam-sdk-io-gcp-bigq) and serialize. I found that I couldn't put what I couldn't into the class. For the time being, I decided to create it every time in getSchema ()` to avoid the error.

Simple sorting

Not limited to this example, there may be cases where ** tables are sorted but the schema is exactly the same **. In this case, you can specify SerializableFunction into ()to make only the table name dynamic (BigQueryIO See "Sharding BigQuery output tables" in /org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html). The schema is usually specified with withSchema ().

The contents of SerializableFunction only implements the process of combininggetDestination ()andgetTable ()of DynamicDestinations. The code below creates a local variable called destination, so I think it's easy to understand the correspondence with the previous code.

  /**
   * Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
   * Destination table name will be {@code <table_id>_<initial for input string>}.
   */
  static class MyDestinationsFunction
      implements SerializableFunction<ValueInSingleWindow<String>, TableDestination> {
    private final String tablePrefix;

    MyDestinationsFunction(String tableId) {
      tablePrefix = tableId + "_";
    }

    @Override
    public TableDestination apply(ValueInSingleWindow<String> input) {
      String destination = input.getValue().substring(0, 1);
      return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
    }
  }

...

    texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
        .to(new MyDestinationsFunction(options.getOutput()))
        .withSchema(schema)
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

What I want to do from now on

reference

Recommended Posts

Sort BigQuery tables according to data in Dataflow
Try to put data in MongoDB
Allow Jupyter Notebook to embed audio data in HTML tables for playback
How to work with BigQuery in Python
Dump BigQuery tables to GCS using Python
Use Cloud Dataflow to dynamically change the destination according to the value of the data and save it in GCS
SELECT data using client library in BigQuery
Books on data science to read in 2020
How to create data to put in CNN (Chainer)
How to read time series data in PyTorch
I tried to implement selection sort in python
[Python] How to sort dict in list and instance in list
Check for the existence of BigQuery tables in Java
Try to decipher the login data stored in Firefox
Sort post data in reverse order with Django's ListView