Eine einführende Zusammenfassung, die ich mir bei der Verwendung von Cloud DataFlow bei der Arbeit kurz angesehen habe. Ich bin kein Experte auf dem Gebiet der Datenverarbeitung, daher habe ich einige Fehler in der Beschreibung gemacht. Wenn du es hast, wirst du es lernen, wenn du es mir sagen kannst :)
Es handelt sich um einen vollständig verwalteten Dienst der Stream- / Batch-Datenverarbeitungs-Engine (Apache Beam), der von Google Cloud Platform bereitgestellt wird.
Was ist das? Weil es sich anfühlt wie Wenn Sie die Elemente grob zerlegen
Gefühl (*) Ab dem 02/02/2018 unterstützt nur Java Streams
Verwenden von Cloud Pub / Sub, dem gleichen GCP-Dienst, als Nachrichtenbus Durch die Kombination mit Datenspeichern wie CloudIoTCore (MQTT Protocol Bridge), GCS und BigQuery kann beispielsweise die Datenübertragung von Terminals empfangen, konvertiert und integriert werden.
Ein grobes Nutzungsmodell aus den Beispielbildern von Google.
Es ist schwer zu verstehen, wenn Sie es nicht tatsächlich verwenden. Dieses Mal werden wir es anhand dieses Ablaufs als Beispiel erstellen.
Da ich die Zusammenarbeit rund um PubSub schreiben wollte, habe ich den Zugang zu CloudPubSub festgelegt, aber natürlich ist es möglich, die Daten von Tabelle A und Tabelle B in BigQuery zu verarbeiten und in Tabelle C zu laden.
Laden Sie das SDK wie folgt in pom.xml herunter.
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>[2.2.0, 2.99)</version>
</dependency>
Wenn Sie einen Maven oder ähnliches haben, überprüfen Sie Offiziell auf weitere Informationen. Bitte.
Legen Sie zunächst die Optionen der zu verarbeitenden Pipeline fest.
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
//Geben Sie den verwendeten Projektnamen an
options.setProject("your gcp porject");
//Erstellen und spezifizieren Sie den GCS-Bucket, der von Dataflow für das Staging verwendet wird
options.setStagingLocation("gs://hoge/staging");
//Erstellen und geben Sie einen GCS-Bucket an, den Dataflow vorübergehend verwendet
options.setTempLocation("gs://hoge/tmp");
//Geben Sie den auszuführenden Läufer an. Geben Sie DataflowRunner an, wenn Sie GCP ausführen. DirectRunner für die lokale Ausführung
options.setRunner(DataflowRunner.class);
//Streaming aktivieren
options.setStreaming(true);
//Geben Sie den Namen zum Zeitpunkt des Betriebs an(Jobs mit demselben Namen können nicht gleichzeitig ausgeführt werden
options.setJobName("sample");
Es gibt auch eine Erklärung in Official, also nur eine grobe Übersicht.
(*) Im obigen Beispiel wollte ich die Einstellung vom Code trennen, daher verwende ich Javas ResourceBundle, aber wenn es nur funktioniert, funktioniert es mit dem direkten Wert.
Grundsätzlich das Folgende zum Programmieren von Datenflussjobs Sie werden sich mit dem Konzept beschäftigen.
PipeLine
Ein Objekt, das einen Verarbeitungsjob darstellt
Wenden Sie grundsätzlich den Verarbeitungsablauf (Eingabe / Konvertierung / Ausgabe) auf PipeLine an.
PCollection
Objekte, die Daten darstellen
Umwandlung
Verarbeitungsteil, der Eingabedaten in Ausgabedaten konvertiert
PipeLineI/O
Definition der Eingabe oder Ausgabe
Wenden Sie grundsätzlich den Konvertierungsprozess an, den Sie in PipeLine geschrieben haben, und die erforderliche Verarbeitung wie PipeLine I / O. Sie werden einen Job aufbauen.
Drei in der obigen Abbildung beschriebene Aktionen
Wird auf die Pipeline angewendet.
//Pipeline (zu verarbeitender Job))Objekt erstellen
Pipeline p = Pipeline.create(options);
TableSchema schema = SampleSchemaFactory.create();
//Wenden Sie den Verarbeitungsinhalt an
//Lesen Sie die Daten aus dem Pubsub-Abonnement
p.apply(PubsubIO.readStrings().fromSubscription("your pubsub subscription"))
//Geben Sie alle 5 Minuten Fenster an(Nicht benötigt)
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))))
//Konvertierung für Eingabe von pubsub einstellen(Die Implementierung wird später beschrieben)
.apply(ParDo.of(new BigQueryRowConverter()))
//Stellen Sie ein, um in BigQuery zu schreiben
.apply("WriteToBQ", BigQueryIO.writeTableRows()
//Geben Sie den Namen der Schreibzieltabelle an
.to(TableDestination("dataset_name:table_name","description"))
//Definieren Sie das Schema, in das mit Object geschrieben werden soll, und übergeben Sie es
.withSchema(schema)
//Erstellen, wenn keine Tabelle vorhanden ist(Möglichkeit)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
//Daten am Ende der Tabelle einfügen (optional))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
//Lauf
p.run();
BigQueryRowConverter
) oben eingestelltBei der Konvertierung mit ParDo erbt es DoFn und implementiert die Datenabrufkonvertierung in PCollection in processElement, einer abstrakten Methode.
package com.mycompany.dataflow_sample.converter;
import com.google.api.services.bigquery.model.TableRow;
import com.google.gson.Gson;
import com.mycompany.dataflow_sample.entity.SampleInputJson;
import org.apache.beam.sdk.transforms.DoFn;
public class BigQueryRowConverter extends DoFn<String,TableRow> {
@ProcessElement
public void processElement(ProcessContext dofn) throws Exception {
//Eingaben empfangen
String json = dofn.element();
Gson gson = new Gson();
//Konvertieren Sie json in ein Objekt
SampleInputJson jsonObj = gson.fromJson(json,SampleInputJson.class);
//Konvertieren Sie den Inhalt von json in tableRow of bigquery
TableRow output = new TableRow();
TableRow attributesOutput = new TableRow();
TableRow attr2Output = new TableRow();
//Stellen Sie die Daten in der Ausgabe ein
attributesOutput.set("attr1", jsonObj.attributes.attr1);
attributesOutput.set("attr2", jsonObj.attributes.attr2);
attr2Output.set("attr2_prop1",jsonObj.attributes.attr2.prop1);
attr2Output.set("attr2_prop2",jsonObj.attributes.attr2.prop2);
attributesOutput .set("attr2",attr2Output);
output.set("attributes", attributesOutput );
output.set("name", jsonObj.name);
output.set("ts", jsonObj.timeStamp/1000);
//Ausgabe
dofn.output(output);
}
}
SampleSchemaFactory.create ()
obenpackage com.mycompany.dataflow_sample.schema;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.List;
public class SampleSchemaFactory {
public static TableSchema create() {
List<TableFieldSchema> fields;
fields = new ArrayList<> ();
fields.add(new TableFieldSchema().setName("name").setType("STRING"));
fields.add(new TableFieldSchema().setName("ts").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("attributes").setType("RECORD")
.setFields(new ArrayList<TableFieldSchema>() {
{
add(new TableFieldSchema().setName("attr1").setType("STRING"));
add(new TableFieldSchema().setName("attr2").setType("RECORD")
.setFields(new ArrayList<TableFieldSchema>() {
{
add(new TableFieldSchema().setName("prop1").setType("INTEGER"));
add(new TableFieldSchema().setName("prop2").setType("STRING"));
}
})
);
}
})
);
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
}
So was.
Erstellen Sie einfach den Java-Code, den Sie zuvor geschrieben haben, und führen Sie ihn aus. Der Job wird dann tatsächlich auf GCP bereitgestellt.
Senden Sie eine Nachricht von der GCP-Konsole.
Die Funktionsprüfung ist in Ordnung, wenn Daten in BigQuery eingefügt werden :)
Sie können das Protokoll im Detailbildschirm überprüfen, indem Sie auf den auf der Konsole erstellten Job klicken. Protokolle werden automatisch an Stackdriver übertragen. Daher empfiehlt es sich, diese für die Überwachung und Überwachung zu verwenden.
Wenn beim Ausführen eines DataflowJob eine Ausnahme auftritt, wird ACK nicht an PubSub gesendet, sodass die Daten erneut abgerufen werden.
Deshalb
--PubSub-Nachrichten werden auch dann nicht verworfen, wenn die Verarbeitung im Ladeteil von BigQuery verloren geht. Sie werden daher automatisch wiederholt (es ist nicht erforderlich, einen erneuten Versuch auf Kommunikationsebene in Betracht zu ziehen).
Das Beispiel ist hier. Ich wollte es partitionieren, also habe ich es geschafft, eine tägliche Tabelle basierend auf der täglichen Tabelle zum Zeitpunkt der Ausführung zu erstellen.
Offizielle Beispielcode-Sammlung (Github) apache beam sdk Google-Beamter
Recommended Posts