Un résumé d'introduction que j'ai examiné rapidement lors de l'utilisation de Cloud DataFlow au travail. Je ne suis pas un expert dans le domaine de l'informatique, j'ai donc commis des erreurs dans la description. Si vous l'avez, vous apprendrez donc si vous pouvez me le dire :)
Il s'agit d'un service entièrement géré de moteur de traitement de données par flux / par lots (Apache Beam) fourni par Google Cloud Platform.
Qu'est-ce que c'est? Parce que c'est comme Si vous décomposez grossièrement les éléments
Feeling (*) Depuis le 02/02/2018, seul Java prend en charge les flux
En utilisant Cloud Pub / Sub, qui est le même service GCP, en tant que bus de messages, En combinant avec des magasins de données tels que CloudIoTCore (MQTT Protocol Bridge), GCS et BigQuery, par exemple, la transmission de données à partir de terminaux peut être reçue, convertie et intégrée.
Un modèle d'utilisation approximatif vu à partir des exemples d'images de Google.
Il est difficile à comprendre à moins que vous ne l'utilisiez réellement, donc cette fois nous allons le construire en utilisant ce flux comme exemple.
Comme je voulais écrire la coopération autour de PubSub, j'ai défini l'entrée de CloudPubSub, mais bien sûr, il est possible de traiter les données des tables A et B dans BigQuery et de les charger dans la table C.
Téléchargez le SDK avec les éléments suivants dans pom.xml.
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>[2.2.0, 2.99)</version>
</dependency>
Si vous n'aimez pas maven, ou si vous avez quelque chose comme ça, consultez Officiel pour plus d'informations. S'il vous plaît.
Tout d'abord, définissez les options du pipeline à traiter.
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
//Spécifiez le nom du projet que vous utilisez
options.setProject("your gcp porject");
//Créer et spécifier le bucket GCS utilisé par Dataflow pour la préparation
options.setStagingLocation("gs://hoge/staging");
//Créer et spécifier un bucket GCS que Dataflow utilise temporairement
options.setTempLocation("gs://hoge/tmp");
//Spécifiez le coureur à exécuter. Spécifiez DataflowRunner lors de l'exécution sur GCP. DirectRunner pour une exécution locale
options.setRunner(DataflowRunner.class);
//Activer le streaming
options.setStreaming(true);
//Spécifiez le nom au moment de l'opération(Les travaux portant le même nom ne peuvent pas être exécutés en même temps
options.setJobName("sample");
Il y a aussi une explication dans Official, donc seulement un aperçu.
(*) Dans l'exemple ci-dessus, je voulais séparer le paramètre du code, donc j'utilise ResourceBundle de Java, mais si cela fonctionne, cela fonctionnera avec la valeur directe.
En gros, ce qui suit pour la programmation des tâches Dataflow Vous vous occuperez du concept.
PipeLine --Un objet qui représente un travail de traitement
En gros, appliquez le flux de traitement (entrée / conversion / sortie) à PipeLine.
PCollection --Objets qui représentent des données
conversion --Partie de traitement qui convertit les données d'entrée en données de sortie
PipeLineI/O --Définition de l'entrée ou de la sortie
En gros, appliquez le processus de conversion que vous avez écrit à PipeLine et le traitement nécessaire tel que PipeLine I / O. Vous construirez un emploi.
Trois actions décrites dans la figure ci-dessus
Sera appliqué au pipeline.
//Pipeline (tâche à traiter))Créer un objet
Pipeline p = Pipeline.create(options);
TableSchema schema = SampleSchemaFactory.create();
//Appliquer le contenu de traitement
//Lire les données de l'abonnement pubsub
p.apply(PubsubIO.readStrings().fromSubscription("your pubsub subscription"))
//Spécifiez les fenêtres toutes les 5 minutes(Non requis)
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))))
//Définir la conversion pour l'entrée de pubsub(La mise en œuvre sera décrite plus tard)
.apply(ParDo.of(new BigQueryRowConverter()))
//Configurer pour écrire dans BigQuery
.apply("WriteToBQ", BigQueryIO.writeTableRows()
//Spécifiez le nom de la table de destination d'écriture
.to(TableDestination("dataset_name:table_name","description"))
//Définissez le schéma sur lequel écrire avec Object et transmettez-le
.withSchema(schema)
//Créer s'il n'y a pas de table(option)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
//Insérer des données à la fin du tableau (facultatif))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
//Courir
p.run();
BigQueryRowConverter
) définie ci-dessusDans le cas d'une conversion utilisant ParDo, il hérite de DoFn et implémente la récupération de données ~ conversion en PCollection dans processElement qui est une méthode abstraite.
package com.mycompany.dataflow_sample.converter;
import com.google.api.services.bigquery.model.TableRow;
import com.google.gson.Gson;
import com.mycompany.dataflow_sample.entity.SampleInputJson;
import org.apache.beam.sdk.transforms.DoFn;
public class BigQueryRowConverter extends DoFn<String,TableRow> {
@ProcessElement
public void processElement(ProcessContext dofn) throws Exception {
//Recevoir des entrées
String json = dofn.element();
Gson gson = new Gson();
//Convertir json en objet
SampleInputJson jsonObj = gson.fromJson(json,SampleInputJson.class);
//Convertir le contenu de json en tableRow de bigquery
TableRow output = new TableRow();
TableRow attributesOutput = new TableRow();
TableRow attr2Output = new TableRow();
//Définir les données en sortie
attributesOutput.set("attr1", jsonObj.attributes.attr1);
attributesOutput.set("attr2", jsonObj.attributes.attr2);
attr2Output.set("attr2_prop1",jsonObj.attributes.attr2.prop1);
attr2Output.set("attr2_prop2",jsonObj.attributes.attr2.prop2);
attributesOutput .set("attr2",attr2Output);
output.set("attributes", attributesOutput );
output.set("name", jsonObj.name);
output.set("ts", jsonObj.timeStamp/1000);
//Production
dofn.output(output);
}
}
SampleSchemaFactory.create ()
ci-dessuspackage com.mycompany.dataflow_sample.schema;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.List;
public class SampleSchemaFactory {
public static TableSchema create() {
List<TableFieldSchema> fields;
fields = new ArrayList<> ();
fields.add(new TableFieldSchema().setName("name").setType("STRING"));
fields.add(new TableFieldSchema().setName("ts").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("attributes").setType("RECORD")
.setFields(new ArrayList<TableFieldSchema>() {
{
add(new TableFieldSchema().setName("attr1").setType("STRING"));
add(new TableFieldSchema().setName("attr2").setType("RECORD")
.setFields(new ArrayList<TableFieldSchema>() {
{
add(new TableFieldSchema().setName("prop1").setType("INTEGER"));
add(new TableFieldSchema().setName("prop2").setType("STRING"));
}
})
);
}
})
);
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
}
Comme ça.
Créez et exécutez simplement le code Java que vous avez écrit précédemment et la tâche sera en fait déployée sur GCP.
Envoyez un message depuis la console GCP.
La vérification de l'opération est OK si des données sont insérées dans BigQuery :)
Vous pouvez consulter le journal à partir de l'écran des détails en cliquant sur le travail créé sur la console. Les journaux sont également automatiquement transférés vers stackdriver, vous devez donc l'utiliser pour la surveillance et la surveillance.
Si une exception se produit lors de l'exécution de DataflowJob, ACK ne sera pas envoyé à PubSub, les données seront donc à nouveau récupérées.
Donc
L'exemple est ici. Je voulais le partitionner, donc je l'ai fait pour créer une table quotidienne basée sur le quotidien au moment de l'exécution.
Collection d'exemples de code officielle (github) apache beam sdk officiel google
Recommended Posts