[JAVA] Créer un pipeline ML à l'aide de Google Cloud Dataflow (1)

introduction

Cette section décrit comment créer un pipeline ML sur GCP.

L'exemple de code de cet article se trouve dans le référentiel suivant. https://github.com/tonouchi510/dataflow-sample

Produits GCP requis

Dans le cas d'un service qui utilise l'apprentissage automatique, vous souhaiterez peut-être effectuer des prédictions et un recyclage par apprentissage automatique et enregistrer les résultats chaque fois que de nouvelles données sont ajoutées. Ici, Cloud ML Engine ne suffit pas à lui seul pour le traitement prédictif qui nécessite un prétraitement et un post-traitement qui ne peuvent pas être effectués uniquement en entrant / sortant dans le modèle d'apprentissage automatique, et il est nécessaire de combiner Dataflow et autres. Cette fois, nous allons créer un pipeline pour l'ensemble du processus de prédiction de l'apprentissage automatique à l'aide du flux de données. Cependant, comme le montant sera important, divisez-le en deux parties.

Constitution

En combinant des produits GCP, vous pouvez créer un pipeline comme indiqué dans la figure ci-dessous. Cette fois, nous visons à créer un flux de "Pub / Sub" -> "Dataflow (pré-traitement)" -> "ML Engine" -> "Dataflow (post-traitement)" -> "GCS".

image.png

Créez un pipeline avec le flux suivant.

  1. Téléchargez de nouvelles données sur GCS (cette fois)
  2. La notification est envoyée à Cloud Pub / Sub (cette fois)
  3. Les tâches de flux de données déployées extraient des sujets Pub / Sub à intervalles réguliers (cette fois)
  4. Prétraitement par pipeline de flux de données
  5. Demande à ML Engine
  6. Post-traitement par pipeline de flux de données
  7. Sauvegarde des résultats dans GCS (cette fois)

Construction de pipelines

Maintenant, écrivons la procédure de construction.

1. Préparation du godet GCS

Préparez un compartiment pour le téléchargement des données d'entrée [1]. Ici, créez un bucket appelé dataflow-sample.

URL du bucket créé: gs: // dataflow-sample

2. Paramètres Pub / Sub

Créez un sujet Pub / Sub pour recevoir des notifications d'événements [2]. Vous pouvez le créer à partir de la barre de navigation de la console GCP, alors créez un sujet avec le nom «gcs-notify», par exemple. Cette fois, l'abonné n'est pas utilisé, il n'est donc pas nécessaire de le paramétrer.

3. Notifications Pub / Sub pour les paramètres GCS

Définissez «Notifications Cloud Pub / Sub pour Cloud Storage» dans le bucket que vous avez créé précédemment afin d'être informé des événements lorsque de nouvelles données sont importées [3]. Au moment de la rédaction de cet article, vous pouvez recevoir les types d'événements suivants, mais cette fois, nous souhaitons déclencher uniquement la création d'un nouveau fichier, spécifiez donc l'option permettant de définir uniquement OBJECT_FINALIZE.

image.png

De plus, puisque nous souhaitons enregistrer les résultats dans le même compartiment, nous spécifions également le dossier pour la surveillance des événements. La commande comprenant les paramètres ci-dessus est la suivante.

$ gsutil notification create -t [TOPIC_NAME] -f json -p [folder] -e OBJECT_FINALIZE gs://[BUCKET_NAME]

//Cette fois
$ gsutil notification create -t gcs-notify -f json -p data/ -e OBJECT_FINALIZE gs://dataflow-sample

Pour vérifier les paramètres, tapez la commande suivante.

$ gsutil notification list gs://[BUCKET_NAME]

Désormais, lorsque le fichier est téléchargé dans le compartiment configuré, ces informations seront envoyées à Pub / Sub et ajoutées à la rubrique gcs-notify sous la forme d'un message.

4. Créer un modèle de flux de données

Je souhaite exécuter un travail de flux de données déclenché par l'occurrence d'un événement Pub / Sub, mais je pense qu'il existe plusieurs façons de le faire.

Cette fois, nous allons utiliser la méthode de démarrage d'une tâche de traitement en continu à partir du modèle de flux de données.

À propos du flux de données [4]

Le pipeline est mis en œuvre à l'aide d'Apache beam, qui est en cours de développement par Google, et le flux de données peut être spécifié comme son Runner sur GCP. Le faisceau Apache lui-même améliore le concept de MapReduce et est plus facile à écrire, et peut être exécuté avec différents coureurs. Le flux de données lui-même est également évolutif et offre d'excellentes performances de traitement distribué, il serait donc très pratique que vous appreniez même à l'utiliser.

python SDK Au moment de la rédaction de cet article, python n'est compatible qu'avec la version 2.7, et les entrées de Pub / Sub ne fonctionnaient pas, et il y avait de nombreux problèmes, il n'est donc pas recommandé de le faire avec python.

Je l'ai également implémenté en python au début, mais pour les débutants dans le traitement distribué, le coût d'apprentissage du flux de données lui-même est élevé et je rencontre de nombreuses fois des problèmes non résolus.J'ai donc abandonné et suis passé à Java stable. Ensuite, j'ai pu le développer presque sans problème.

Java SDK Dans le cas de java, je pense qu'il est relativement stable et contient beaucoup de documentation. Le document officiel utilise maven, mais il est difficile à utiliser, donc je vais le construire avec gradle References.

Ajoutez simplement ce qui suit aux dépendances de build.gradle.

compile group: 'com.google.cloud.dataflow', name: 'google-cloud-dataflow-java-sdk-all', version: '2.5.0'

Implémentation du pipeline

Dans cet article, nous allons extraire un message de la rubrique Pub / Sub toutes les 30 secondes, extraire le chemin du fichier image nouvellement créé à partir du contenu du message, le rogner et l'enregistrer dans GCS. Veuillez vous référer au document (Références [5] [6]) pour l'explication d'Apache beam lui-même.

Le code ressemble à ceci:

PubsubToText.java



public class PubsubToText {

    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();

        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
        dataflowOptions.setRunner(DataflowRunner.class);
        dataflowOptions.setProject("dataflow-sample");
        dataflowOptions.setStagingLocation("gs://dataflow-sample-bucket/staging");
        dataflowOptions.setTemplateLocation("gs://dataflow-sample-bucket/templates/MyTemplate");
        dataflowOptions.setStreaming(true);
        dataflowOptions.setNumWorkers(1);

        run(dataflowOptions);
    }

    public static PipelineResult run(DataflowPipelineOptions options) {
        String topic = "projects/dataflow-sample/topics/gcs-notify";
        String output = "gs://dataflow-sample-result/output.txt";

        Pipeline p = Pipeline.create(options);

        /*
         * Steps:
         *   1) Read string messages from PubSub
         *   2) Window the messages into minute intervals specified by the executor.
         *   3) Output the windowed files to GCS
         */
        p.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromTopic(topic))
                .apply( "30s Window",
                        Window.into(FixedWindows.of(Duration.standardSeconds(60))))
                .apply("Load Image", ParDo.of(new LoadImageFn()))
                .apply("Write File(s)", TextIO.write()
                        .withWindowedWrites()
                        .withNumShards(1)
                        .to(output));

        return p.run();
    }
}

Le point important est de spécifier l'option avec dataflowOptions.setStreaming (true) pour le traitement du streaming, et de définir la fenêtre de l'intervalle de temps lors de l'implémentation du pipeline.

Les messages notifiés par les notifications Cloud Pub / Sub pour Cloud Storage sont au format suivant. J'écris un processus pour extraire les informations nécessaires d'ici avec ma propre méthode, lire l'image du chemin acquis sur GCS, la recadrer et la sauvegarder dans le répertoire de résultats.

image.png

LoadImageFn.java



public class LoadImageFn extends DoFn<PubsubMessage, String> {

    @ProcessElement
    public void processElement(@Element PubsubMessage m, OutputReceiver<String> out) {
        Map<String, String> attr = m.getAttributeMap();

        Storage storage = StorageOptions.getDefaultInstance().getService();
        BlobId blob = BlobId.of(attr.get("bucketId"), attr.get("objectId"));
        byte[] content = storage.readAllBytes(blob);
        InputStream is = new ByteArrayInputStream(content);

        BufferedImage img = null;
        try {
            img = ImageIO.read(is);
        } catch (IOException e) {
            e.printStackTrace();
        }

        //Coordonnées pour commencer la coupe
        int X = 50;
        int Y = 50;
        //Taille de coupe
        int W = 100;
        int H = 100;

        BufferedImage subimg;  //Classe de stockage d'image découpée
        try {
            assert img != null;
            subimg = img.getSubimage(X, Y, W, H);
        }
        catch ( RasterFormatException re ) {
            System.out.println( "La plage spécifiée est en dehors de la plage de l'image" );
            return;
        }

        BlobId blobId = BlobId.of(attr.get("bucketId"), "result/cropped_image.jpg ");
        BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("image/jpeg").build();

        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        BufferedOutputStream os = new BufferedOutputStream( bos );

        try {
            ImageIO.write(subimg, "jpeg", os);
        } catch (IOException e) {
            e.printStackTrace();
        }

        storage.create(blobInfo, bos.toByteArray());

        out.output(String.valueOf(img.getHeight()));
    }

}

À propos, même pour les vidéos, vous pouvez utiliser le client GCS et JavaCV (car il existe de nombreuses situations dans lesquelles vous souhaitez inclure le traitement ffmpeg dans le flux de données),

Désormais, lorsque vous exécutez le programme localement, le fichier modèle sera créé dans le compartiment GCS spécifié par dataflowOptions.setTemplateLocation (" gs: // dataflow-sample-bucket / templates / MyTemplate ").

$ gradle run

5. Créer une tâche de flux de données à partir d'un modèle

procédure

image.png

Vérifiez si le travail s'exécute lorsque vous téléchargez un nouveau fichier sur GCS. Étant donné que le sujet est extrait toutes les 30 secondes dans le processus de diffusion en continu, vous devez attendre un moment.

Lorsque l'exécution du travail est terminée, il est OK si l'image recadrée est créée dans GCS. Puisqu'il s'agit d'un travail de flux de données pour le traitement en continu, il ne se fermera pas même si un travail est terminé, et le travail sera exécuté si vous téléchargez à nouveau le fichier. Veuillez vérifier le fonctionnement.

Résumé

Une fois le nouveau fichier téléchargé sur GCS, PubSub a été notifié et le travail Dataflow a été exécuté et enregistré le résultat dans GCS.

J'ai écrit dans le pipeline de flux de données le processus de lecture de l'image à partir du chemin écrit dans le message de "Notifications Cloud Pub / Sub pour Cloud Storage" et de son enregistrement dans GCS. Le reste est donc le prétraitement, la prédiction et le post-traitement. Si vous ajoutez un pipeline à faire, tout le pipeline ML sera terminé. Puisqu'il est nécessaire d'utiliser ML Engine, etc., j'écrirai sur ces constructions à un autre moment.

Les références

[1] https://cloud.google.com/storage/docs/ [2] https://cloud.google.com/pubsub/docs/ [3] https://cloud.google.com/storage/docs/pubsub-notifications [4] https://cloud.google.com/dataflow/docs/ [5] https://beam.apache.org/documentation/ [6] https://beam.apache.org/documentation/programming-guide/

Recommended Posts

Créer un pipeline ML à l'aide de Google Cloud Dataflow (1)
Construire un pipeline CICD à l'aide de Docker (mémorandum personnel)