Utilisez Cloud Dataflow pour modifier dynamiquement la destination en fonction de la valeur des données et enregistrez-la dans GCS

introduction

Cet article est l'article du 23ème jour du Calendrier de l'Avent Classi 2019.

Bonjour, c'est @tomoyanamekawa de données AI faisant partie de Classi. Je crée généralement une plateforme d'analyse de données sur GCP.

Récemment, il y a eu un cas où "je veux diviser les données dans BigQuery en fichiers en fonction des valeurs à l'intérieur et les enregistrer dans GCS", et à ce moment-là, j'ai été pris en charge par Cloud Dataflow. Il semble y avoir une demande de la part d'autres personnes, et il y avait peu d'exemples d'implémentation en Python, donc je vais le résumer.

Ce but

Exécutez quotidiennement le processus d'exportation d'une table spécifique dans BigQuery vers Google Cloud Storage (GCS). Cependant, je souhaite modifier le répertoire de destination de l'enregistrement en fonction de la valeur d'une certaine colonne. Le format de fichier est json.

Exemple

Tableau des réservations dans BigQuery reservationsテーブル Je veux enregistrer dans GCS séparément pour chaque date / shop_id comme celui-ci. reservations_GCS

Dessin d'achèvement

完成図

environnement

Qu'est-ce que Cloud Dataflow?

C'est un service qui peut effectuer un traitement ETL sans serveur fourni par GCP. En coulisse, Apache Beam est en cours d'exécution, on peut donc dire que c'est un service qui peut utiliser Apache Beam sans serveur. Étant donné que le traitement parallèle peut être effectué, même des données à grande échelle peuvent être traitées à grande vitesse.

Il prend en charge à la fois le traitement de flux et le traitement par lots, mais cette fois, nous utiliserons le traitement par lots. Pour plus d'informations, veuillez visiter la page officielle.

Pour ceux qui veulent pouvoir l'utiliser pour le moment, je pense que cette procédure dans le matériel de présentation de M. Yuzutaso est bonne (je l'attrape aussi avec ça) Il a été téléchargé).

Créer un modèle personnalisé

Cloud Dataflow utilise ce que l'on appelle un "modèle" pour créer un processus ETL. Pour un traitement général, utilisez les Modèles fournis par Google pour faciliter la tâche sur une base graphique. Je peux le faire. Cependant, je ne peux pas faire ce que je veux faire cette fois, je vais donc créer moi-même un modèle personnalisé.

À propos, Java ou Python peut être utilisé comme langage de programmation. Cette fois, j'écrirai en Python, mais Java a plus de fonctions et de documentation, donc si vous ou les membres de votre équipe pouvez écrire Java et qu'il n'y a pas de problèmes de maintenance, je pense que Java est meilleur.

Voici le contenu du modèle personnalisé.

test_template.py


import os
import json
import datetime

import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions

        
class JsonSink(fileio.TextSink):
    def write(self, record):
      self._fh.write(json.dumps(record).encode('utf8'))
      self._fh.write('\n'.encode('utf8'))


if __name__ == '__main__':
    now = datetime.datetime.now().strftime('%Y%m%d')
    project_id = 'your_project'
    dataset_name = 'your_dataset'
    table_name = 'your_table'
    bucket_name = 'your_bucket'

    #option
    pipeline_options = PipelineOptions()
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    google_cloud_options.project = project_id
    google_cloud_options.job_name = 'myjob'
    google_cloud_options.staging_location = f'gs://{bucket_name}/staging'
    google_cloud_options.temp_location = f'gs://{bucket_name}/temp'
    google_cloud_options.template_location = f'gs://{bucket_name}/templates/test_template'
    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
        
    #Créer un pipeline
    pipeline = beam.Pipeline(options=pipeline_options)
    (pipeline 
        | 'read' >> beam.io.Read(beam.io.BigQuerySource(
            project=project_id, 
            use_standard_sql=True, 
            query=f'select * from `{project_id}.{dataset_name}.{table_name}`'
        ))
        | 'write' >> beam.io.fileio.WriteToFiles(
            path=f'gs://{bucket_name}/{now}',
            destination=lambda record, table_name=table_name: f"shop_id_{record['shop_id']}/",
            sink=JsonSink(),
            file_naming=beam.io.fileio.destination_prefix_naming()
        )
    )

    pipeline.run()

Le fait est que nous utilisons cette fonction Dynamic Destinations. Puisque la valeur de chaque enregistrement est stockée dans la variable appelée record, vous pouvez changer la destination (nom du fichier de destination) pour chaque enregistrement avec record ['shop_id'].

Puisque le modèle créé doit être placé sur GCS, exécutez cette commande.

python -m test_template

Ensuite, le modèle sera placé à l'emplacement spécifié par google_cloud_options.template_location. Vous pouvez également définir l'emplacement du modèle lors de l'exécution.

Faites-le fonctionner quotidiennement

Cloud Dataflow lui-même n'a pas de fonction de planificateur, il doit donc être exécuté en externe pour fonctionner quotidiennement. Par conséquent, cette fois, nous activerons l'exécution sans serveur avec Cloud Scheduler + Cloud Pub / Sub + Cloud Functions. schedulerの構成

Enregistrez le script suivant dans Cloud Functions. Ce script exécutera le modèle personnalisé pour vous.

from googleapiclient.discovery import build
def main(data, context):
    job = 'my_job'
    dataflow = build('dataflow', 'v1b3')
    request = dataflow.projects().templates().launch(
        projectId='your_project',
        gcsPath='gs://your_bucket/templates/test_template'
    )
    response = request.execute()

Les déclencheurs Cloud Functions sont Pub / Sub. De plus, lorsque vous utilisez Pub / Sub comme déclencheur, il est nécessaire de recevoir deux arguments, il est donc défini comme main (données, contexte).

Tout ce que vous avez à faire est de créer un sujet Pub / Sub qui est le déclencheur et de publier ce sujet quotidiennement à partir de Cloud Scheduler.

Si vous configurez Cloud Composer ou un serveur et que vous le planifiez avec d'autres moteurs de flux de travail ou cron, vous pouvez exécuter un modèle personnalisé à partir de la commande gcloud ci-dessous.

gcloud dataflow jobs run my_job \
        --gcs-location gs://your_bucket/templates/test_template \
        --region=asia-northeast1

en conclusion

Cloud Dataflow est très pratique car il serait terrifiant de mettre en œuvre un système capable d'effectuer un tel traitement à grande échelle en peu de temps. C'est un peu cher, donc je pense qu'il est nécessaire de choisir l'utilisation pour qu'elle ne coûte pas xx millions de yens avec Cloud Dataflow.

Demain, c'est @ tetsuya0617. impatient de!

Recommended Posts

Utilisez Cloud Dataflow pour modifier dynamiquement la destination en fonction de la valeur des données et enregistrez-la dans GCS
traitement pour utiliser les données notMNIST en Python (et essayé de les classer)
[Python] Le rôle de l'astérisque devant la variable. Divisez la valeur d'entrée et affectez-la à une variable
Trier les tables BigQuery en fonction des données dans Dataflow
Utilisez Pillow pour rendre l'image transparente et en superposer une partie seulement
Comment stocker une fonction Python dans la valeur d'un dictionnaire (dict) et appeler la fonction en fonction de la clé
Utilisons les données ouvertes de "Mamebus" en Python
Comment utiliser Decorator dans Django et comment le créer
[Django 2.2] Trier et obtenir la valeur de la destination de la relation
[C / C ++] Passez la valeur calculée en C / C ++ à une fonction python pour exécuter le processus et utilisez cette valeur en C / C ++.
Renvoyez les données d'image avec Flask of Python et dessinez-les dans l'élément canvas de HTML
[Python] Modifier le contrôle du cache des objets téléchargés sur Cloud Storage
Changer la valeur de paramètre de setting.py en fonction de l'environnement de développement
Changer la destination de sortie standard en un fichier en Python
Comparaison de l'utilisation des fonctions d'ordre supérieur dans Python 2 et 3
Changer le volume de Pepper en fonction de l'environnement environnant (son)
Django a changé pour enregistrer beaucoup de données à la fois
Gratter la liste des magasins membres Go To EAT dans la préfecture de Fukuoka et la convertir en CSV
Gratter la liste des magasins membres Go To EAT dans la préfecture de Niigata et la convertir en CSV
Comment retourner les données contenues dans le modèle django au format json et les mapper sur le dépliant
Comment changer la couleur du seul bouton pressé avec Tkinter
Racler le calendrier de Hinatazaka 46 et le refléter dans Google Agenda
N'hésitez pas à changer l'étiquette de légende avec Seaborn en python
Je souhaite utiliser à la fois la clé et la valeur de l'itérateur Python
J'ai résumé comment changer les paramètres de démarrage de GRUB et GRUB2
Convertissez le résultat de python optparse en dict et utilisez-le
[Python / Jupyter] Traduisez le commentaire du programme copié dans le presse-papiers et insérez-le dans une nouvelle cellule.
Utilisons Python pour représenter la fréquence des données binaires contenues dans une trame de données dans un graphique à barres unique.
Lisez les données du lecteur NFC connecté à Raspberry Pi 3 avec Python et envoyez-les à openFrameworks avec OSC
Mettez à jour les données en les téléchargeant sur s3 d'aws avec une commande, et supprimez les données utilisées (en chemin)
De l'introduction de l'API GoogleCloudPlatform Natural Language à son utilisation
Graphique de l'historique du nombre de couches de deep learning et du changement de précision
Prédisez la quantité d'énergie utilisée en 2 jours et publiez-la au format CSV
Changer la saturation et la clarté des spécifications de couleur comme # ff000 dans python 2.5
Accédez à l'API de classement Rakuten pour enregistrer le classement de n'importe quelle catégorie au format CSV
Convertissez la feuille de calcul en CSV et importez-la dans Cloud Storage avec Cloud Functions
Rechercher le nom et les données d'une variable libre dans un objet fonction
Démarrez la webcam, prenez une image fixe et enregistrez-la localement
Implémenté dans Dataflow pour copier la structure hiérarchique de Google Drive vers Google Cloud Storage
J'ai essayé d'afficher la valeur d'altitude du DTM dans un graphique
[python] Envoyez l'image capturée de la caméra Web au serveur et enregistrez-la
[Introduction to Data Scientist] Bases du calcul scientifique, du traitement des données et comment utiliser la bibliothèque de dessins graphiques ♬ Construction d'environnement
L'arbre.plot_tree de scikit-learn était très simple et pratique, j'ai donc essayé de résumer comment l'utiliser facilement.
(Journal 1) Comment créer, parcourir et enregistrer des données dans la base de données SQL du service Microsoft Azure avec python
Une histoire sur un ingénieur qui a remarqué l'émo de la cryptographie et tente de l'implémenter en Python
GAE --Avec Python, faites pivoter l'image en fonction des informations de rotation d'EXIF et importez-la dans Cloud Storage.