Cet article est l'article du 23ème jour du Calendrier de l'Avent Classi 2019.
Bonjour, c'est @tomoyanamekawa de données AI faisant partie de Classi. Je crée généralement une plateforme d'analyse de données sur GCP.
Récemment, il y a eu un cas où "je veux diviser les données dans BigQuery en fichiers en fonction des valeurs à l'intérieur et les enregistrer dans GCS", et à ce moment-là, j'ai été pris en charge par Cloud Dataflow. Il semble y avoir une demande de la part d'autres personnes, et il y avait peu d'exemples d'implémentation en Python, donc je vais le résumer.
Exécutez quotidiennement le processus d'exportation d'une table spécifique dans BigQuery vers Google Cloud Storage (GCS). Cependant, je souhaite modifier le répertoire de destination de l'enregistrement en fonction de la valeur d'une certaine colonne. Le format de fichier est json.
Tableau des réservations dans BigQuery Je veux enregistrer dans GCS séparément pour chaque date / shop_id comme celui-ci.
C'est un service qui peut effectuer un traitement ETL sans serveur fourni par GCP. En coulisse, Apache Beam est en cours d'exécution, on peut donc dire que c'est un service qui peut utiliser Apache Beam sans serveur. Étant donné que le traitement parallèle peut être effectué, même des données à grande échelle peuvent être traitées à grande vitesse.
Il prend en charge à la fois le traitement de flux et le traitement par lots, mais cette fois, nous utiliserons le traitement par lots. Pour plus d'informations, veuillez visiter la page officielle.
Pour ceux qui veulent pouvoir l'utiliser pour le moment, je pense que cette procédure dans le matériel de présentation de M. Yuzutaso est bonne (je l'attrape aussi avec ça) Il a été téléchargé).
Cloud Dataflow utilise ce que l'on appelle un "modèle" pour créer un processus ETL. Pour un traitement général, utilisez les Modèles fournis par Google pour faciliter la tâche sur une base graphique. Je peux le faire. Cependant, je ne peux pas faire ce que je veux faire cette fois, je vais donc créer moi-même un modèle personnalisé.
À propos, Java ou Python peut être utilisé comme langage de programmation. Cette fois, j'écrirai en Python, mais Java a plus de fonctions et de documentation, donc si vous ou les membres de votre équipe pouvez écrire Java et qu'il n'y a pas de problèmes de maintenance, je pense que Java est meilleur.
Voici le contenu du modèle personnalisé.
test_template.py
import os
import json
import datetime
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
class JsonSink(fileio.TextSink):
def write(self, record):
self._fh.write(json.dumps(record).encode('utf8'))
self._fh.write('\n'.encode('utf8'))
if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y%m%d')
project_id = 'your_project'
dataset_name = 'your_dataset'
table_name = 'your_table'
bucket_name = 'your_bucket'
#option
pipeline_options = PipelineOptions()
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = f'gs://{bucket_name}/staging'
google_cloud_options.temp_location = f'gs://{bucket_name}/temp'
google_cloud_options.template_location = f'gs://{bucket_name}/templates/test_template'
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
#Créer un pipeline
pipeline = beam.Pipeline(options=pipeline_options)
(pipeline
| 'read' >> beam.io.Read(beam.io.BigQuerySource(
project=project_id,
use_standard_sql=True,
query=f'select * from `{project_id}.{dataset_name}.{table_name}`'
))
| 'write' >> beam.io.fileio.WriteToFiles(
path=f'gs://{bucket_name}/{now}',
destination=lambda record, table_name=table_name: f"shop_id_{record['shop_id']}/",
sink=JsonSink(),
file_naming=beam.io.fileio.destination_prefix_naming()
)
)
pipeline.run()
Le fait est que nous utilisons cette fonction Dynamic Destinations.
Puisque la valeur de chaque enregistrement est stockée dans la variable appelée record, vous pouvez changer la destination (nom du fichier de destination) pour chaque enregistrement avec record ['shop_id']
.
Puisque le modèle créé doit être placé sur GCS, exécutez cette commande.
python -m test_template
Ensuite, le modèle sera placé à l'emplacement spécifié par google_cloud_options.template_location
.
Vous pouvez également définir l'emplacement du modèle lors de l'exécution.
Cloud Dataflow lui-même n'a pas de fonction de planificateur, il doit donc être exécuté en externe pour fonctionner quotidiennement. Par conséquent, cette fois, nous activerons l'exécution sans serveur avec Cloud Scheduler + Cloud Pub / Sub + Cloud Functions.
Enregistrez le script suivant dans Cloud Functions. Ce script exécutera le modèle personnalisé pour vous.
from googleapiclient.discovery import build
def main(data, context):
job = 'my_job'
dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().templates().launch(
projectId='your_project',
gcsPath='gs://your_bucket/templates/test_template'
)
response = request.execute()
Les déclencheurs Cloud Functions sont Pub / Sub.
De plus, lorsque vous utilisez Pub / Sub comme déclencheur, il est nécessaire de recevoir deux arguments, il est donc défini comme main (données, contexte)
.
Tout ce que vous avez à faire est de créer un sujet Pub / Sub qui est le déclencheur et de publier ce sujet quotidiennement à partir de Cloud Scheduler.
Si vous configurez Cloud Composer ou un serveur et que vous le planifiez avec d'autres moteurs de flux de travail ou cron, vous pouvez exécuter un modèle personnalisé à partir de la commande gcloud ci-dessous.
gcloud dataflow jobs run my_job \
--gcs-location gs://your_bucket/templates/test_template \
--region=asia-northeast1
Cloud Dataflow est très pratique car il serait terrifiant de mettre en œuvre un système capable d'effectuer un tel traitement à grande échelle en peu de temps. C'est un peu cher, donc je pense qu'il est nécessaire de choisir l'utilisation pour qu'elle ne coûte pas xx millions de yens avec Cloud Dataflow.
Demain, c'est @ tetsuya0617. impatient de!