Exécutez Cloud Dataflow (Python) depuis AppEngine

(Mis à jour après la version v2.0.0 du 31/05)

Étant donné que la version Cloud Dataflow Python est enfin GA, j'ai essayé de voir si l'exécution du modèle pouvait être effectuée avec la version Python, mais j'ai trouvé que le pipeline pré-enregistré ~~ (bien que les paramètres ne puissent pas être transmis à partir du 23/03) ~~ (31/05) (Il est maintenant possible de passer des paramètres dans la version 2.0.0) J'ai pu démarrer à partir d'AppEngine et je souhaite partager la procédure.

Qu'est-ce que l'exécution du modèle Cloud Dataflow?

Il s'agit d'une fonction qui vous permet d'enregistrer à l'avance le pipeline Dataflow dans GCS et d'exécuter le pipeline enregistré en passant des paramètres à tout moment. En appelant l'exécution de modèle via AppEngine, vous pouvez facilement exécuter le traitement des données et le traitement d'analyse à partir du programme sans avoir à configurer vous-même un serveur pour démarrer le pipeline. Vous pouvez également utiliser cron pour exécuter régulièrement le pipeline d'analyse des données.

Il peut ne pas être beaucoup utilisé au stade des essais et des erreurs pour améliorer la précision par l'apprentissage automatique, mais lorsqu'il entre en fonctionnement réel, il est possible d'exécuter un pipeline de traitement de données compliqué sans se soucier du fonctionnement du serveur. Je pense que ce sera beaucoup plus facile pour l'opérateur. De plus, il semble que le développement sera plus facile si le pipeline utilisé pendant les essais et erreurs peut être mis en service réel sous une forme proche de celle-là. (Outre la difficulté de créer un modèle de haute précision, il devrait être assez difficile de façonner le modèle d'apprentissage automatique créé comme un système fonctionnant de manière stable)

Méthode d'exécution

L'exécution du modèle Dataflow prend les étapes suivantes:

Ci-dessous, je voudrais expliquer la procédure pour chaque étape.

Correction de l'utilisation de paramètres dans le pipeline

Définit une classe d'options personnalisée pour recevoir les paramètres transmis de l'extérieur. Dans Beam, le programme fait référence aux paramètres transmis de l'extérieur lors de l'exécution via la classe ValueProvider. La classe PipelineOptions a son propre analyseur avec une méthode add_value_provider_argument pour lire les paramètres en tant que ValueProvider. Créez une classe d'options personnalisée qui hérite de la classe PipelineOptions et décrivez les paramètres des paramètres que vous souhaitez ajouter à l'analyseur dans la méthode _add_argparse_args appelée lors de l'initialisation. Dans l'exemple ci-dessous, l'entrée, la sortie et la date sont spécifiées en tant que paramètres personnalisés.

pipeline.py


import apache_beam as beam

class MyOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):

        parser.add_value_provider_argument(
            '--input',
            default="gs://{mybucket}/{pathtofile}",
            help='input gcs file path')
        
        parser.add_value_provider_argument(
            '--output',
            default="gs://{mybucket}/{pathtofile}",
            help='output gcs file path')

        parser.add_value_provider_argument(
            '--date',
            default="20170531",
            help='today')

options = MyOptions()

Dans le traitement du pipeline, modifiez-le pour que la valeur de ValueProvider soit utilisée. ValueProvider peut être référencé en tant que variable pour l'option personnalisée créée précédemment. Le programme doit obtenir la valeur différemment via ValueProvider et obtenir la valeur avec .get (). Si vous souhaitez utiliser ValueProvider dans le traitement interne PTransform ou DoFn, passez ValueProvider dans le constructeur, conservez-le en tant que variable d'instance et utilisez .get () en interne pour y faire référence. Notez que les classes ReadFromText et WriteToText fournies par Beam peuvent directement passer ValueProvider comme argument. Dans l'exemple suivant, le fichier de destination spécifié par l'entrée de paramètre externe est lu, chaque ligne est remplacée par la chaîne de caractères spécifiée par date et le fichier est écrit dans le chemin spécifié par la sortie.

pipeline.py


class MyDoFn(beam.DoFn):

    #Recevoir ValueProvider dans le constructeur et définir la variable d'instance
    def __init__(self, date):
        self._date = date
    
    def process(self, element):
        yield self._date.get() #La valeur est.get()Obtenir par méthode


p = beam.Pipeline(options=options)

(p | "Read"  >> beam.io.ReadFromText(options.input)
   | "DoFn"  >> beam.ParDo(MyDoFn(options.date))  #Passer ValueProvider au constructeur DoFn
   | "Write" >> beam.io.WriteToText(options.output))

p.run()

Enregistrer le pipeline avec GCS

Si vous ajoutez le chemin GCS auquel le modèle est enregistré dans Google Cloud Options et que vous l'exécutez, le pipeline sera exécuté à la place, mais le fichier modèle qui décrit le contenu de traitement du pipeline sera enregistré dans le chemin GCS spécifié. L'environnement d'exécution devrait convenir partout où Dataflow Runner peut exécuter le pipeline.

pipeline.py


options = MyOptions()

#Spécifiez DataflowRunner pour le coureur
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'

# template_Spécifiez le chemin GCS pour enregistrer le modèle à l'emplacement
google_cloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
google_cloud_options.template_location = 'gs://{your bucket}/mytemplate'

#Exécutez le pipeline
p = beam.Pipeline(options=options)

~Code de traitement du pipeline~

p.run().wait_until_finish()

Lorsqu'il est exécuté ci-dessus, un fichier modèle qui décrit le contenu de traitement du pipeline est généré dans le chemin GCS spécifié.

Exécutez le pipeline enregistré

L'exécution du modèle enregistré envoie des instructions à Google REST API. La bibliothèque cliente Google Cloud ne semble pas prise en charge (à partir de mars 2017), nous allons donc utiliser la bibliothèque cliente des API Google ici. Avant de l'utiliser, installez la bibliothèque cliente d'API de Dataflow (la v1b3 semble être la dernière version du 23/03). Si vous spécifiez le chemin GCS créé lors de l'exécution ci-dessus dans gcsPath du corps du paramètre de requête et que vous l'exécutez, le travail sera généré et exécuté à partir du modèle. Vous trouverez ci-dessous un exemple de code pour Go et Python, mais vous devriez également pouvoir l'exécuter à partir de Library dans d'autres langues. (J'ai essayé la version Python localement, mais je ne l'ai pas essayée sur AppEngine, alors faites-le moi savoir s'il y a un problème)

Go


import (
  "net/http"
  "golang.org/x/net/context"
  "golang.org/x/oauth2"
  "golang.org/x/oauth2/google"
  "google.golang.org/appengine"
  dataflow "google.golang.org/api/dataflow/v1b3"
  "google.golang.org/appengine/urlfetch"
)
//Omission
func handler(w http.ResponseWriter, r *http.Request) {
  c := appengine.NewContext(r)
  client := &http.Client{
    Transport: &oauth2.Transport{
      Source: google.AppEngineTokenSource(c, "https://www.googleapis.com/auth/cloud-platform"),
      Base:   &urlfetch.Transport{Context: c},
    },
  }

  service, err := dataflow.New(client)
  templates := dataflow.NewProjectsTemplatesService(service)
  req := &dataflow.CreateJobFromTemplateRequest{
    GcsPath: "gs://{your bucket}/mytemplate",
    JobName: "sklearn",
    Parameters: map[string]string{
      "input": "gs://{yourbucket}/{pathtofile1}",
      "output": "gs://{yourbucket}/{pathtofile2}",
      "date": "20170601",
    },
  }
  job, err := templates.Create("{your project}", req).Do()
  //Omission
}

Python


from oauth2client.client import GoogleCredentials
from oauth2client.service_account import ServiceAccountCredentials
from apiclient.discovery import build

credentials = GoogleCredentials.get_application_default()
service = build("dataflow", "v1b3", credentials=credentials)
templates = service.projects().templates()

body = {
    "environment": {
        "bypassTempDirValidation": False,
        "tempLocation": "gs://{your bucket}/temp",
        #"serviceAccountEmail": "A String",
        #"zone": "us-central1-f",
        "maxWorkers": 1,
    },
    "gcsPath": "gs://{your bucket}/mytemplate",
    "parameters": {
      "input": "gs://{yourbucket}/{pathtofile1}",
      "output": "gs://{yourbucket}/{pathtofile2}",
      "date": "20170601",
    },
    "jobName": "sklearn",
}
req = templates.create(projectId="{your project}", body=body)
res = req.execute()

Il semble que les seuls éléments requis dans le corps sont gcsPath et jobName. Il semble que jobName devrait contenir une chaîne de caractères unique au travail en cours d'exécution. paramètre spécifie le paramètre d'exécution que vous souhaitez transmettre au pipeline au moment de l'exécution. La réponse contient l'ID du travail, donc conservez-le si vous souhaitez annuler le travail ultérieurement.

À propos, le pipeline enregistré peut également être exécuté à partir de la console. Vous pouvez également exécuter un travail en sélectionnant un modèle personnalisé sur l'écran ci-dessous qui passe de [+ EXÉCUTER JOB] en haut de l'écran de la console Dataflow, et en spécifiant le chemin GCS du modèle enregistré. dataflowjob.png

Arrêtez le pipeline en cours d'exécution

Si vous démarrez une tâche dans le pipeline mais que vous trouvez un problème, ou si vous souhaitez la démarrer uniquement pendant une durée spécifiée en mode de diffusion en continu, vous devez arrêter la tâche et démarrer la tâche à partir du modèle. Lors de l'arrêt, indiquez l'état "JOB_STATE_CANCELLED" dans la même API REST Dataflow et mettez à jour le travail. Voici un exemple de code Python.

Python


jobs= service.projects().jobs()

body = {
    "requestedState": "JOB_STATE_CANCELLED"
}
req = jobs.update(projectId={your project}, jobId={job ID}, body=body)
res = req.execute()

Cela annulera le travail et supprimera le cluster démarré.

en conclusion

Vous pouvez également exécuter périodiquement un pipeline d'analyse de données créé à l'avance à l'aide de cron, etc. à partir d'AppEngine. Cela a élargi la gamme d'utilisation des flux de données non seulement pour le prétraitement dans la phase de vérification de l'analyse des données, mais également pour la collecte et le traitement des données dans la phase d'exploitation. Comme vous pouvez facilement créer un pipeline, en écrivant un flux de travail de prétraitement qui suppose la collecte de données pendant le fonctionnement, même dans la phase de vérification du système d'analyse de données, les données qui étaient censées être utilisées au moment de la vérification sont acquises au moment du développement du système. Vous pouvez faciliter l'identification rapide des problèmes tels que les pleurs et la refonte de la modélisation lorsque vous constatez que le coût est étonnamment élevé.

Je pense que l'une des fonctionnalités de GCP est que les développeurs d'applications et les ingénieurs en apprentissage automatique peuvent se concentrer sur le développement et l'analyse des données en laissant le côté cloud prendre en charge la construction et l'exploitation des infrastructures problématiques. Je vais. Je m'attends à ce que Dataflow assume le rôle de construction et d'exploitation d'un pipeline de traitement de données, ce qui a tendance à être gênant dans l'analyse des données, tout comme AppEngine l'était dans le développement d'applications Web.

for Java

Exécutez Cloud Dataflow pour Java à partir d'App Engine for Go avec les paramètres d'exécution

Recommended Posts

Exécutez Cloud Dataflow (Python) depuis AppEngine
Exécutez XGBoost avec Cloud Dataflow (Python)
[Python] Exécutez Flask sur Google App Engine
Exécutez Python à partir d'Excel
Tutoriel Cloud Run (python)
Exécutez un pipeline de machine learning avec Cloud Dataflow (Python)
Exécutez le script illustrator à partir de python
Comment activer python3 pour exécuter des tâches lors de l'envoi de tâches de GCP Cloud Composer vers Dataflow
Prise en charge de Java 1 1 par Google App Engine
Utiliser Cloud Storage depuis Python3 (Introduction)
Exécutez Aprili depuis Python sur Orange
Détection d'erreur Python exécutée à partir de Powershell
Exécutez des scripts Python de manière synchrone à partir de C #
Exécutez Ansible à partir de Python à l'aide de l'API
Utiliser Cloud Datastore depuis Compute Engine
Exécutez le script Python à partir de Cisco Memorandum_EEM
Utiliser l'API Google Cloud Vision de Python
Utilisation de ImageField de Django avec AppEngine / Python
Exploitez le stockage d'objets cloud de Sakura à partir de Python
Exécuter des scripts Python à partir d'applications C # GUI
Firebase: utilisez Cloud Firestore et Cloud Storage depuis Python
Accéder à Cloud Storage à partir d'une instance Compute Engine
sql à sql
Tweet (API 1.1) avec Google App Engine pour Python
MeCab de Python
Exécutez des fichiers Python à partir de HTML en utilisant Django
Exécutez des scripts Python à partir d'Excel (en utilisant xlwings)
Exécutez l'application flask sur Cloud 9 et Apache Httpd
Exécutez Python à partir d'Excel VBA avec xlwings et un supplément de tutoriel
Développement d'applications pour tweeter en Python à partir de Visual Studio 2017
Déployer l'application Django sur Google App Engine (Python3)
Procédure de construction de l'environnement de développement Google App Engine / Python (fin 2014)
Étapes de l'installation de Python 3 à la création d'une application Django
PIL en Python sur Windows8 (pour Google App Engine)
Premiers pas avec Google App Engine pour Python et PHP
Exemple d'apprentissage SPA (Angular2 + Bootstrap / App Engine / Python + webapp2)
Comment utiliser Django avec Google App Engine / Python
Version d'exécution de l'environnement standard Google App Engine / Python
python + sélénium + safari-run iphone safari de mac avec webdriver
Générez Word Cloud à partir de données de cas d'essai avec python3
Utilisez Thingsspeak de Python
Exécutez Python avec VBA
Exploitez Filemaker depuis Python
Utiliser fluentd de python
Exécutez prepDE.py avec python3
Accéder à bitcoind depuis python
Changements de Python 3.0 à Python 3.5
Changements de Python 2 à Python 3.0
Python depuis ou import
Utilisez MySQL depuis Python
Installer Python à partir de la source
Exécuter des commandes depuis Python
Exécutez Blender avec python
Faites fonctionner le neutron de Python!
Cloud Dataflow Super Primer
Utiliser MySQL depuis Python
Faire fonctionner LXC depuis Python
Manipuler riak depuis python
Moteur de template Jinja2 2 Python