[JAVA] Erstellen einer ML-Pipeline mit Google Cloud Dataflow (1)

Einführung

In diesem Abschnitt wird beschrieben, wie Sie eine ML-Pipeline auf GCP erstellen.

Der Beispielcode für diesen Artikel befindet sich im folgenden Repository. https://github.com/tonouchi510/dataflow-sample

Erforderliche GCP-Produkte

Bei einem Dienst, der maschinelles Lernen verwendet, möchten Sie möglicherweise Vorhersagen und Umschulungen durch maschinelles Lernen treffen und die Ergebnisse jedes Mal speichern, wenn neue Daten hinzugefügt werden. Wenn Sie eine vorausschauende Verarbeitung durchführen möchten, die eine Vor- und Nachbearbeitung erfordert, die nicht nur durch Eingabe / Ausgabe in das maschinelle Lernmodell abgeschlossen werden kann, reicht Cloud ML Engine allein nicht aus, und es ist erforderlich, den Datenfluss usw. zu kombinieren. Dieses Mal erstellen wir eine Pipeline für den gesamten Vorhersageprozess des maschinellen Lernens mithilfe des Datenflusses. Da die Menge jedoch groß sein wird, teilen Sie sie in zwei Teile.

Verfassung

Durch die Kombination von GCP-Produkten können Sie eine Pipeline erstellen, wie in der folgenden Abbildung dargestellt. Dieses Mal wollen wir einen Fluss von "Pub / Sub" -> "Datenfluss (Vorverarbeitung)" -> "ML Engine" -> "Datenfluss (Nachbearbeitung)" -> "GCS" erstellen.

image.png

Erstellen Sie eine Pipeline mit dem folgenden Ablauf.

  1. Laden Sie neue Daten in GCS hoch (diesmal)
  2. Die Benachrichtigung geht an Cloud Pub / Sub (diesmal)
  3. Bereitgestellte Datenflussjobs ziehen in regelmäßigen Abständen (diesmal) Pub / Sub-Themen ab.
  4. Vorverarbeitung durch Datenfluss-Pipeline
  5. Anfrage an ML Engine
  6. Nachbearbeitung per Datenfluss-Pipeline
  7. Speichern der Ergebnisse in GCS (diesmal)

Pipelinebau

Schreiben wir nun die Konstruktionsprozedur.

1. Vorbereitung des GCS-Eimers

Bereiten Sie einen Bucket zum Hochladen von Eingabedaten vor [1]. Erstellen Sie hier einen Bucket namens dataflow-sample.

URL des erstellten Buckets: gs: // dataflow-sample

2. Pub / Sub-Einstellungen

Erstellen Sie ein Pub / Sub-Thema, um Ereignisbenachrichtigungen zu erhalten [2]. Sie können es über die Navigationsleiste der GCP-Konsole erstellen. Erstellen Sie also beispielsweise ein Thema mit dem Namen "gcs-notify". Dieses Mal wird der Teilnehmer nicht verwendet, daher muss er nicht eingestellt werden.

3. Pub / Sub-Benachrichtigungen für GCS-Einstellungen

Stellen Sie in dem zuvor erstellten Bucket "Cloud Pub / Sub-Benachrichtigungen für Cloud-Speicher" ein, damit Sie über Ereignisse informiert werden, wenn neue Daten hochgeladen werden [3]. Zum Zeitpunkt des Schreibens dieses Artikels können Sie die folgenden Ereignistypen empfangen. Dieses Mal möchten wir jedoch nur die Erstellung einer neuen Datei auslösen. Geben Sie daher die Option an, nur OBJECT_FINALIZE festzulegen.

image.png

Da wir die Ergebnisse im selben Bucket speichern möchten, geben wir auch den Ordner für die Ereignisüberwachung an. Der Befehl einschließlich der obigen Einstellungen lautet wie folgt.

$ gsutil notification create -t [TOPIC_NAME] -f json -p [folder] -e OBJECT_FINALIZE gs://[BUCKET_NAME]

//Diesmal
$ gsutil notification create -t gcs-notify -f json -p data/ -e OBJECT_FINALIZE gs://dataflow-sample

Geben Sie den folgenden Befehl ein, um die Einstellungen zu überprüfen.

$ gsutil notification list gs://[BUCKET_NAME]

Wenn die Datei in den konfigurierten Bucket hochgeladen wird, werden diese Informationen an Pub / Sub gesendet und in Form einer Nachricht zum Thema "gcs-notify" hinzugefügt.

4. Erstellen Sie eine Datenflussvorlage

Ich möchte einen Datenflussjob ausführen, der durch das Auftreten eines Pub / Sub-Ereignisses ausgelöst wird, aber ich denke, es gibt verschiedene Möglichkeiten, dies zu tun.

Dieses Mal verwenden wir die Methode zum Starten eines Streaming-Verarbeitungsjobs aus der Datenflussvorlage.

Über den Datenfluss [4]

Die Pipeline wird mithilfe von Apache Beam implementiert, der von Google entwickelt wird, und der Datenfluss kann als Runner in GCP angegeben werden. Der Apache-Strahl selbst hat das Konzept von MapReduce verbessert, um das Schreiben zu vereinfachen, und kann mit verschiedenen Läufern ausgeführt werden. Der Datenfluss selbst ist ebenfalls skalierbar und weist eine hervorragende verteilte Verarbeitungsleistung auf. Daher wäre es sehr praktisch, wenn Sie sogar lernen würden, wie man ihn verwendet.

python SDK Zum Zeitpunkt des Schreibens ist Python nur mit Version 2.7 kompatibel, und die Eingabe von Pub / Sub funktionierte nicht, und es gab viele Probleme. Daher wird nicht empfohlen, dies mit Python zu tun.

Anfangs habe ich es auch in Python implementiert, aber für Anfänger in der verteilten Verarbeitung sind die Lernkosten des Datenflusses selbst hoch und ich stoße oft auf ungelöste Probleme. Deshalb habe ich aufgegeben und auf stabiles Java umgestellt. Dann konnte ich es fast ohne Probleme entwickeln.

Java SDK Im Fall von Java denke ich, dass es relativ stabil ist und viel Dokumentation hat. Das offizielle Dokument verwendet maven, aber es ist schwierig zu verwenden, daher werde ich es mit gradle Referenzen erstellen.

Fügen Sie einfach Folgendes zu den Abhängigkeiten von build.gradle hinzu.

compile group: 'com.google.cloud.dataflow', name: 'google-cloud-dataflow-java-sdk-all', version: '2.5.0'

Pipeline-Implementierung

In diesem Artikel ziehen wir alle 30 Sekunden eine Nachricht aus dem Pub / Sub-Thema, extrahieren den Pfad der neu erstellten Bilddatei aus dem Inhalt der Nachricht, schneiden sie zu und speichern sie in GCS. Weitere Informationen zum Apache Beam selbst finden Sie im Dokument (Referenzen [5] [6]).

Der Code sieht folgendermaßen aus:

PubsubToText.java



public class PubsubToText {

    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();

        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
        dataflowOptions.setRunner(DataflowRunner.class);
        dataflowOptions.setProject("dataflow-sample");
        dataflowOptions.setStagingLocation("gs://dataflow-sample-bucket/staging");
        dataflowOptions.setTemplateLocation("gs://dataflow-sample-bucket/templates/MyTemplate");
        dataflowOptions.setStreaming(true);
        dataflowOptions.setNumWorkers(1);

        run(dataflowOptions);
    }

    public static PipelineResult run(DataflowPipelineOptions options) {
        String topic = "projects/dataflow-sample/topics/gcs-notify";
        String output = "gs://dataflow-sample-result/output.txt";

        Pipeline p = Pipeline.create(options);

        /*
         * Steps:
         *   1) Read string messages from PubSub
         *   2) Window the messages into minute intervals specified by the executor.
         *   3) Output the windowed files to GCS
         */
        p.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromTopic(topic))
                .apply( "30s Window",
                        Window.into(FixedWindows.of(Duration.standardSeconds(60))))
                .apply("Load Image", ParDo.of(new LoadImageFn()))
                .apply("Write File(s)", TextIO.write()
                        .withWindowedWrites()
                        .withNumShards(1)
                        .to(output));

        return p.run();
    }
}

Der wichtige Punkt ist, die Option mit "dataflowOptions.setStreaming (true)" für die Streaming-Verarbeitung anzugeben und das Zeitintervallfenster bei der Implementierung der Pipeline festzulegen.

Die von Cloud Pub / Sub Notifications for Cloud Storage benachrichtigten Nachrichten haben das folgende Format. Ich schreibe einen Prozess, um die erforderlichen Informationen mit meiner eigenen Methode von hier zu extrahieren, das Bild aus dem erfassten Pfad in GCS zu lesen, es zuzuschneiden und im Ergebnisverzeichnis zu speichern.

image.png

LoadImageFn.java



public class LoadImageFn extends DoFn<PubsubMessage, String> {

    @ProcessElement
    public void processElement(@Element PubsubMessage m, OutputReceiver<String> out) {
        Map<String, String> attr = m.getAttributeMap();

        Storage storage = StorageOptions.getDefaultInstance().getService();
        BlobId blob = BlobId.of(attr.get("bucketId"), attr.get("objectId"));
        byte[] content = storage.readAllBytes(blob);
        InputStream is = new ByteArrayInputStream(content);

        BufferedImage img = null;
        try {
            img = ImageIO.read(is);
        } catch (IOException e) {
            e.printStackTrace();
        }

        //Koordinaten, um mit dem Schneiden zu beginnen
        int X = 50;
        int Y = 50;
        //Schnittgröße
        int W = 100;
        int H = 100;

        BufferedImage subimg;  //Ausgeschnittene Bildspeicherklasse
        try {
            assert img != null;
            subimg = img.getSubimage(X, Y, W, H);
        }
        catch ( RasterFormatException re ) {
            System.out.println( "Der angegebene Bereich liegt außerhalb des Bildbereichs" );
            return;
        }

        BlobId blobId = BlobId.of(attr.get("bucketId"), "result/cropped_image.jpg ");
        BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("image/jpeg").build();

        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        BufferedOutputStream os = new BufferedOutputStream( bos );

        try {
            ImageIO.write(subimg, "jpeg", os);
        } catch (IOException e) {
            e.printStackTrace();
        }

        storage.create(blobInfo, bos.toByteArray());

        out.output(String.valueOf(img.getHeight()));
    }

}

Übrigens können Sie auch für Videos den GCS-Client und JavaCV verwenden (da es viele Situationen gibt, in denen Sie die ffmpeg-Verarbeitung in den Datenfluss einbeziehen möchten).

Wenn Sie das Programm jetzt lokal ausführen, wird die Vorlagendatei in dem GCS-Bucket erstellt, der durch "dataflowOptions.setTemplateLocation" ("gs: // Datenfluss-Beispiel-Bucket / Templates / MyTemplate") angegeben wird.

$ gradle run

5. Erstellen Sie einen Datenflussjob aus einer Vorlage

Verfahren

image.png

Überprüfen Sie, ob der Job ausgeführt wird, wenn Sie eine neue Datei in GCS hochladen. Da das Thema beim Streaming alle 30 Sekunden abgerufen wird, müssen Sie eine Weile warten.

Wenn die Jobausführung abgeschlossen ist, ist es in Ordnung, wenn das zugeschnittene Bild in GCS erstellt wird. Da es sich um einen Datenflussjob für die Streaming-Verarbeitung handelt, wird er auch dann nicht geschlossen, wenn ein Job abgeschlossen ist, und der Job wird ausgeführt, wenn Sie die Datei erneut hochladen. Bitte überprüfen Sie den Betrieb.

Zusammenfassung

Nachdem die neue Datei in GCS hochgeladen wurde, wurde PubSub benachrichtigt, und der Datenflussjob wurde ausgeführt und das Ergebnis in GCS gespeichert.

Ich habe den Prozess des Lesens des Bildes aus dem Pfad geschrieben, der in der Nachricht aus "Cloud Pub / Sub-Benachrichtigungen für Cloud-Speicher" geschrieben ist, und es in GCS in der Datenfluss-Pipeline gespeichert. Der Rest ist also Vorverarbeitung, Vorhersage, Nachbearbeitung Wenn Sie eine Pipeline hinzufügen, wird die gesamte ML-Pipeline fertiggestellt. Da es notwendig ist, ML Engine usw. zu verwenden, werde ich zu einem anderen Zeitpunkt über diese Konstruktionen schreiben.

Verweise

[1] https://cloud.google.com/storage/docs/ [2] https://cloud.google.com/pubsub/docs/ [3] https://cloud.google.com/storage/docs/pubsub-notifications [4] https://cloud.google.com/dataflow/docs/ [5] https://beam.apache.org/documentation/ [6] https://beam.apache.org/documentation/programming-guide/

Recommended Posts

Erstellen einer ML-Pipeline mit Google Cloud Dataflow (1)
Erstellen einer CICD-Pipeline mit Docker (persönliches Memorandum)