[JAVA] Trier les tables BigQuery en fonction des données dans Dataflow

Avec Google Cloud Dataflow, vous pouvez générer des données vers plusieurs emplacements en branchant le pipeline. Cependant, avec cette méthode, la destination de sortie doit être décidée à l'avance et il n'est pas possible de faire quelque chose comme "sortie vers n'importe quelle table BigQuery spécifiée dans les données" (probablement). Je cherchais un moyen de réaliser une telle ** distribution de sortie dynamique **, et en tant que but général DynamicDestinations J'ai trouvé que je devais utiliser la classe .0 / org / apache / beam / sdk / io / gcp / bigquery / DynamicDestinations.html).

Dans cet article, nous allons essayer la distribution de table de deux manières, la méthode utilisant ce DynamicDestinations et la méthode simplifiée.

Exemple de configuration

Comme je ne suis intéressé que par la sortie cette fois, je vais faire "enregistrer le texte obtenu à partir de Pub / Sub tel qu'il est dans BigQuery" sans traitement de données supplémentaire. Cependant, le pipeline est ramifié et deux types de traitement de sortie sont ajoutés afin qu'il puisse être comparé à la sortie d'une seule table normale.

dd-pipeline.png

Voici un exemple spécifique des données stockées dans BigQuery.

$ table="<project_id>:<dataset_id>.<table_id>"
$ bq head -n 3 "${table}"
+-----------+
|   text    |
+-----------+
| Charizard |
| Alakazam  |
| Nidoqueen |
+-----------+
$ bq head -n 3 "${table}_C"
+------------+
|    text    |
+------------+
| Cloyster   |
| Clefairy   |
| Charmander |
+------------+

$ {table} contient toutes les données d'entrée, alors que $ {table} _C contient uniquement le texte commençant par C, par exemple. Chaque table de distribution est un nom généré par le code, et seuls ceux dont vous avez besoin sont créés au moment de l'exécution.

code

src/main/java/com/example/dataflow/DynamicDestinationsPipeline.java


package com.example.dataflow;

import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;

/**
 * An example of BigQuery dynamic destinations.
 *
 * <p>To run this example using managed resource in Google Cloud
 * Platform, you should specify the following command-line options:
 *   --project=<YOUR_PROJECT_ID>
 *   --jobName=<JOB_NAME>
 *   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
 *   --tempLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
 *   --runner=DataflowRunner
 *   --input=<PUBSUB_INPUT_TOPIC>
 *   --output=<BIGQUERY_OUTPUT_TABLE>
 */
public class DynamicDestinationsPipeline {
  /**
   * Add commandline options: input (Pub/Sub topic ID) and output (BigQuery table ID).
   */
  public interface MyOptions extends PipelineOptions {
    @Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
    @Validation.Required
    String getInput();
    void setInput(String value);

    @Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>")
    @Validation.Required
    String getOutput();
    void setOutput(String value);
  }

  /**
   * Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
   * Destination table name will be {@code <table_id>_<initial for input string>}.
   */
  static class MyDestinations extends DynamicDestinations<String, String> {
    private final String tablePrefix;

    MyDestinations(String tableId) {
      tablePrefix = tableId + "_";
    }

    /**
     * Returns a destination table specifier: initial for input string.
     */
    @Override
    public String getDestination(ValueInSingleWindow<String> element) {
      return element.getValue().substring(0, 1);
    }

    /**
     * Returns a TableDestination object for the destination.
     * The table name will be {@code <table_id>_<initial for input string>}.
     */
    @Override
    public TableDestination getTable(String destination) {
      return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
    }

    /**
     * Returns a table schema for the destination.
     * The table has only one column: text as STRING.
     */
    @Override
    public TableSchema getSchema(String destination) {
      List<TableFieldSchema> fields = new ArrayList<>();
      fields.add(new TableFieldSchema().setName("text").setType("STRING"));
      return new TableSchema().setFields(fields);
    }
  }

  /**
   * A formatter to convert an input text to a BigQuery table row.
   */
  static class MyFormatFunction implements SerializableFunction<String, TableRow> {
    @Override
    public TableRow apply(String input) {
      return new TableRow().set("text", input);
    }
  }

  /**
   * Run a pipeline.
   * It reads texts from a Pub/Sub topic and writes them to BigQuery tables.
   * The master table saves all the texts, and other tables save the texts with same initials.
   */
  public static void main(String[] args) {
    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("text").setType("STRING"));
    TableSchema schema = new TableSchema().setFields(fields);

    Pipeline p = Pipeline.create(options);

    PCollection<String> texts = p.apply("ReadFromPubSub", PubsubIO.readStrings()
        .fromTopic(options.getInput()));
    texts.apply("WriteToBigQuery", BigQueryIO.<String>write()
        .to(options.getOutput())
        .withSchema(schema)
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
        .to(new MyDestinations(options.getOutput()))
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    p.run();
  }
}

[^ invalid-initial]: le traitement des caractères qui ne peuvent pas être utilisés dans les noms de table a été ignoré.


Avant d'exécuter, créez un sujet Pub / Sub et un ensemble de données BigQuery. Un compartiment de stockage est également requis lors de la spécification de stagingLocation, etc.

Vous pouvez publier sur Pub / Sub à partir de l'interface graphique, mais j'ai utilisé le script Ruby suivant pour diffuser le fichier texte.

publish.rb


require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new(
  project_id: "<project_id>",
  credentials: "credential.json"
)
topic = pubsub.topic "<topic_id>"

while (text = gets)
  topic.publish text.chomp
end
$ head -3 list.txt
Bulbasaur
Ivysaur
Venusaur
$ ruby publish.rb < list.txt

Où ça se coince

Au début, j'ai également passé le schéma à la classe MyDestinations, mais quand je l'ai exécuté, j'ai obtenu une ʻIllegalArgumentException. Après la recherche, j'ai trouvé [Même erreur](https://stackoverflow.com/questions/46165895/dataflow-dynamicdestinations-unable-to-serialize-org-apache-beam-sdk-io-gcp-bigq) et sérialiser. J'ai trouvé que je ne pouvais pas mettre ce que je ne pouvais pas faire dans la classe. Pour le moment, j'ai décidé de le créer à chaque fois dans getSchema ()` pour éviter l'erreur.

Tri simple

Non limité à cet exemple, il peut y avoir des cas où ** les tables sont triées mais le schéma est exactement le même **. Dans ce cas, vous pouvez spécifier SerializableFunction dans à () pour rendre uniquement le nom de la table dynamique (BigQueryIO. (Voir «Partage des tables de sortie BigQuery» dans /org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html). Le schéma est généralement spécifié avec withSchema ().

Le contenu de SerializableFunction implémente uniquement le processus de combinaison de getDestination () ʻet getTable () ʻof DynamicDestinations. Le code ci-dessous crée une variable locale appelée destination, il devrait donc être facile de comprendre la correspondance avec le code précédent.

  /**
   * Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
   * Destination table name will be {@code <table_id>_<initial for input string>}.
   */
  static class MyDestinationsFunction
      implements SerializableFunction<ValueInSingleWindow<String>, TableDestination> {
    private final String tablePrefix;

    MyDestinationsFunction(String tableId) {
      tablePrefix = tableId + "_";
    }

    @Override
    public TableDestination apply(ValueInSingleWindow<String> input) {
      String destination = input.getValue().substring(0, 1);
      return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
    }
  }

...

    texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
        .to(new MyDestinationsFunction(options.getOutput()))
        .withSchema(schema)
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Ce que je veux faire à partir de maintenant

référence

Recommended Posts

Trier les tables BigQuery en fonction des données dans Dataflow
Essayez de mettre des données dans MongoDB
Autoriser Jupyter Notebook à incorporer des données audio dans des tableaux HTML pour la lecture
Comment utiliser BigQuery en Python
Vider les tables BigQuery dans GCS à l'aide de Python
Utilisez Cloud Dataflow pour modifier dynamiquement la destination en fonction de la valeur des données et enregistrez-la dans GCS
SELECT des données à l'aide de la bibliothèque cliente avec BigQuery
Livres sur la science des données à lire en 2020
Comment créer des données à mettre dans CNN (Chainer)
Comment lire les données de séries chronologiques dans PyTorch
J'ai essayé d'implémenter le tri sélectif en python
[Python] Comment trier un dict dans une liste et une instance dans une liste
Vérifier l'existence de tables BigQuery en Java
Essayez de déchiffrer les données de connexion stockées dans Firefox
Trier les données de publication dans l'ordre inverse avec ListView de Django