Avec Google Cloud Dataflow, vous pouvez générer des données vers plusieurs emplacements en branchant le pipeline. Cependant, avec cette méthode, la destination de sortie doit être décidée à l'avance et il n'est pas possible de faire quelque chose comme "sortie vers n'importe quelle table BigQuery spécifiée dans les données" (probablement). Je cherchais un moyen de réaliser une telle ** distribution de sortie dynamique **, et en tant que but général DynamicDestinations
J'ai trouvé que je devais utiliser la classe .0 / org / apache / beam / sdk / io / gcp / bigquery / DynamicDestinations.html).
Dans cet article, nous allons essayer la distribution de table de deux manières, la méthode utilisant ce DynamicDestinations
et la méthode simplifiée.
Comme je ne suis intéressé que par la sortie cette fois, je vais faire "enregistrer le texte obtenu à partir de Pub / Sub tel qu'il est dans BigQuery" sans traitement de données supplémentaire. Cependant, le pipeline est ramifié et deux types de traitement de sortie sont ajoutés afin qu'il puisse être comparé à la sortie d'une seule table normale.
Voici un exemple spécifique des données stockées dans BigQuery.
$ table="<project_id>:<dataset_id>.<table_id>"
$ bq head -n 3 "${table}"
+-----------+
| text |
+-----------+
| Charizard |
| Alakazam |
| Nidoqueen |
+-----------+
$ bq head -n 3 "${table}_C"
+------------+
| text |
+------------+
| Cloyster |
| Clefairy |
| Charmander |
+------------+
$ {table}
contient toutes les données d'entrée, alors que $ {table} _C
contient uniquement le texte commençant par C, par exemple. Chaque table de distribution est un nom généré par le code, et seuls ceux dont vous avez besoin sont créés au moment de l'exécution.
src/main/java/com/example/dataflow/DynamicDestinationsPipeline.java
package com.example.dataflow;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
/**
* An example of BigQuery dynamic destinations.
*
* <p>To run this example using managed resource in Google Cloud
* Platform, you should specify the following command-line options:
* --project=<YOUR_PROJECT_ID>
* --jobName=<JOB_NAME>
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
* --tempLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
* --runner=DataflowRunner
* --input=<PUBSUB_INPUT_TOPIC>
* --output=<BIGQUERY_OUTPUT_TABLE>
*/
public class DynamicDestinationsPipeline {
/**
* Add commandline options: input (Pub/Sub topic ID) and output (BigQuery table ID).
*/
public interface MyOptions extends PipelineOptions {
@Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
@Validation.Required
String getInput();
void setInput(String value);
@Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>")
@Validation.Required
String getOutput();
void setOutput(String value);
}
/**
* Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
* Destination table name will be {@code <table_id>_<initial for input string>}.
*/
static class MyDestinations extends DynamicDestinations<String, String> {
private final String tablePrefix;
MyDestinations(String tableId) {
tablePrefix = tableId + "_";
}
/**
* Returns a destination table specifier: initial for input string.
*/
@Override
public String getDestination(ValueInSingleWindow<String> element) {
return element.getValue().substring(0, 1);
}
/**
* Returns a TableDestination object for the destination.
* The table name will be {@code <table_id>_<initial for input string>}.
*/
@Override
public TableDestination getTable(String destination) {
return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
}
/**
* Returns a table schema for the destination.
* The table has only one column: text as STRING.
*/
@Override
public TableSchema getSchema(String destination) {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("text").setType("STRING"));
return new TableSchema().setFields(fields);
}
}
/**
* A formatter to convert an input text to a BigQuery table row.
*/
static class MyFormatFunction implements SerializableFunction<String, TableRow> {
@Override
public TableRow apply(String input) {
return new TableRow().set("text", input);
}
}
/**
* Run a pipeline.
* It reads texts from a Pub/Sub topic and writes them to BigQuery tables.
* The master table saves all the texts, and other tables save the texts with same initials.
*/
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("text").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
Pipeline p = Pipeline.create(options);
PCollection<String> texts = p.apply("ReadFromPubSub", PubsubIO.readStrings()
.fromTopic(options.getInput()));
texts.apply("WriteToBigQuery", BigQueryIO.<String>write()
.to(options.getOutput())
.withSchema(schema)
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
.to(new MyDestinations(options.getOutput()))
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run();
}
}
to ()
en DynamicDestinations <T,?>
. Cette fois, afin de donner à la classe le nom de la table de base, le MyDestinations
hérité est utilisé comme argument.withSchema ()
n'est pas requis. Inclus dans «DynamicDestinations».getDestination ()
retourne un objet (destination) pour identifier la distribution des données. Cette fois, il s'agit du "texte initial" et le type est "String".getTable ()
retourne la table correspondant à la destination. Cette fois, les initiales sont ajoutées au nom de la table de base [^ invalid-initial].getSchema ()
retourne le schéma de la table correspondant à la destination. Cette fois, il est commun à toutes les tables, donc je crée exactement la même chose. (* Obstrué ici. Voir [Section suivante](# Bouché))[^ invalid-initial]: le traitement des caractères qui ne peuvent pas être utilisés dans les noms de table a été ignoré.
Avant d'exécuter, créez un sujet Pub / Sub et un ensemble de données BigQuery. Un compartiment de stockage est également requis lors de la spécification de stagingLocation, etc.
Vous pouvez publier sur Pub / Sub à partir de l'interface graphique, mais j'ai utilisé le script Ruby suivant pour diffuser le fichier texte.
publish.rb
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new(
project_id: "<project_id>",
credentials: "credential.json"
)
topic = pubsub.topic "<topic_id>"
while (text = gets)
topic.publish text.chomp
end
$ head -3 list.txt
Bulbasaur
Ivysaur
Venusaur
$ ruby publish.rb < list.txt
Au début, j'ai également passé le schéma à la classe MyDestinations
, mais quand je l'ai exécuté, j'ai obtenu une ʻIllegalArgumentException. Après la recherche, j'ai trouvé [Même erreur](https://stackoverflow.com/questions/46165895/dataflow-dynamicdestinations-unable-to-serialize-org-apache-beam-sdk-io-gcp-bigq) et sérialiser. J'ai trouvé que je ne pouvais pas mettre ce que je ne pouvais pas faire dans la classe. Pour le moment, j'ai décidé de le créer à chaque fois dans
getSchema ()` pour éviter l'erreur.
Non limité à cet exemple, il peut y avoir des cas où ** les tables sont triées mais le schéma est exactement le même **. Dans ce cas, vous pouvez spécifier SerializableFunction
dans à ()
pour rendre uniquement le nom de la table dynamique (BigQueryIO. (Voir «Partage des tables de sortie BigQuery» dans /org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html). Le schéma est généralement spécifié avec withSchema ()
.
Le contenu de SerializableFunction
implémente uniquement le processus de combinaison de getDestination () ʻet
getTable () ʻof DynamicDestinations
. Le code ci-dessous crée une variable locale appelée destination
, il devrait donc être facile de comprendre la correspondance avec le code précédent.
/**
* Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
* Destination table name will be {@code <table_id>_<initial for input string>}.
*/
static class MyDestinationsFunction
implements SerializableFunction<ValueInSingleWindow<String>, TableDestination> {
private final String tablePrefix;
MyDestinationsFunction(String tableId) {
tablePrefix = tableId + "_";
}
@Override
public TableDestination apply(ValueInSingleWindow<String> input) {
String destination = input.getValue().substring(0, 1);
return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
}
}
...
texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
.to(new MyDestinationsFunction(options.getOutput()))
.withSchema(schema)
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Recommended Posts