Lorsque Airflow est introduit, une erreur se produit dans le traitement par lots de cron, et à la suite de la capture du fichier journal, il devient possible d'éviter des choses telles que la sortie du journal est trop douce et la cause ne peut pas être identifiée.
Connaissez-vous Airflow?
Outil de planification et de surveillance du pipeline de données open source d'Airbnb. En termes simples, un cron hautes performances qui peut créer une arborescence de tâches. C'est un logiciel open source développé dans la série Python2 qui peut être installé pip. Lors de l'événement à grande échelle re: Invent 2015 organisé par AWS chaque année, plusieurs entreprises ont annoncé qu'elles utilisaient Airflow et ont attiré l'attention. J'étais intéressé par la lecture de Yahoo Announcement. Cet article est un mémo qui examine et vérifie si Airflow doit être introduit dans le projet.
■ J'ai essayé de mettre la tâche d'analyse du projet sur Airflow
Il y avait peu de matériaux japonais, donc au début je ne savais pas ce qu'Airflow pouvait faire. Je voudrais reporter la méthode d'installation et compléter d'abord les informations sur l'utilisation. Après l'avoir examiné pendant environ une semaine, en regardant en arrière, je pense que la description écrite dans la première ligne du référentiel Airflow décrit le mieux Airflow.
Airflow is a system to programmatically author, schedule and monitor data pipelines.(Traduction super:Airflow est un système qui fournit les fonctions suivantes par programmation. Exemple:Calendrier du pipeline de données, surveillance, etc.)
Si Airflow est utilisé à des fins autres que la planification et la surveillance, par exemple, s'il est utilisé pour écrire des commandes d'opération de données ou exécuter manuellement des tâches, il deviendra un système difficile à utiliser immédiatement, il est donc important de séparer l'utilisation avant l'introduction. est.
J'ai écrit une tâche d'analyse dans Airflow. La raison pour laquelle les données sont placées une fois dans S3 est que si le processus échoue au milieu, le temps d'acquisition sera décalé s'il est réacquis de la base de données, et je ne veux pas appuyer sur la commande de vidage plusieurs fois par jour.
■ Spécifications Après avoir vidé les données nécessaires de MySQL une fois par jour et les avoir enregistrées dans S3, copiez-les dans Google Cloud Storage et saisissez les données dans BigQuery. Envoyez un e-mail aux personnes concernées lorsque les données sont installées dans Google Cloud Storage.
■ Décomposer les spécifications en tâches
■ Ce que vous ne devez pas faire lors de la conception d'Airflow Comme recommandé par jenkins, les définitions de détail des tâches doivent être agrégées dans chaque shell ou commande. Si vous écrivez la logique métier gorigori du côté Airflow, il sera difficile de gérer les mises à jour et de refléter la différence. Le traitement à programmer du côté Airflow doit être axé sur le débit et la planification.
■ Exemple d'implémentation du tag Airflow Nous programmerons les spécifications avec Airflow. Écrivez la logique dans export_db.py (je pensais que l'implémentation qui ne serait pas reconnue comme une tâche Airflow si même un commentaire japonais était écrit était vraiment un gaspillage.)
export_db.py
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='export_db', default_args=args)
CMD_BASE_DIR = '~/analyze/{}'
# cmd file name
EXPORT_DB_TO_S3_CMD = 'export_db_to_s3.sh'
COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD = 'copy_s3_to_google_storage.py'
IMPORT_BIG_QUERY_CMD = 'import_big_query.py'
SEND_MAIL_CMD = 'send_mail.py'
def do_cmd(cmd):
os.system(cmd)
# define task
# 1. db to s3
task1 = PythonOperator(
task_id='1.' + EXPORT_DB_TO_S3_CMD,
python_callable=do_cmd,
provide_context=True,
op_kwargs={'cmd': CMD_BASE_DIR.format(EXPORT_DB_TO_S3_CMD)},
dag=dag)
# 2. s3 to cloud storage
task2 = PythonOperator(
task_id='2.' + COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD,
python_callable=do_cmd,
op_kwargs={'cmd': CMD_BASE_DIR.format(COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD)},
dag=dag)
# 3. import bq
task3 = PythonOperator(
task_id='3.' + IMPORT_BIG_QUERY_CMD,
python_callable=do_cmd,
op_kwargs={'cmd': CMD_BASE_DIR.format(IMPORT_BIG_QUERY_CMD)},
dag=dag)
# 4. send mail
task4 = PythonOperator(
task_id='4.' + SEND_MAIL_CMD,
python_callable=do_cmd,
op_kwargs={'cmd': CMD_BASE_DIR.format(SEND_MAIL_CMD)},
dag=dag)
# define task stream
# 1 >> 2 >> 3 and 2 >> 4
task1.set_downstream(task2)
task2.set_downstream(task3)
task2.set_downstream(task4)
# define start task
run_this = task1
■ Tester si la tâche fonctionne
python ~/airflow/dags/export_db.py
■ Redémarrez Airflow pour refléter la tâche Je pense que le fait qu'Airflow doit être redémarré pour refléter la tâche est vraiment un gaspillage. En raison de cet inconvénient, il n'est pas recommandé d'écrire une logique métier dans la tâche.
■ Confirmez avec le navigateur que la tâche est enregistrée Par défaut, export_db est enregistré dans [http: // localhost: 8080 / admin /](http: // localhost: 8080 / admin /) sur la page supérieure.
■ Planifier l'exécution Pour exécuter selon le planning défini en python, commencez par ʻairflow scheduler`. Dans ce cas, la date et l'heure de début sont définies il y a 7 jours, de sorte qu'il continuera à se déplacer dans une boucle infinie sans fin.
■ Image 1. Vue graphique de la tâche spécifiée
■ Image 2. Liste des principaux DAG
■ Image 3. Arborescence des tâches
C'est un mérite d'introduction par rapport à l'opération avec cron. C'est beaucoup de subjectivité.
■ 1. Visualisation du temps d'exécution de chaque tâche Je pense que c'est génial de prendre un résumé du temps d'exécution et de l'afficher sous forme de graphique dans une belle vue Web. Il est également difficile d'obtenir un résumé si vous exécutez un lot avec cron et que vous le crachez dans le journal.
■ 2. Le journal des erreurs est de toute façon facile à voir Vous pouvez voir sur le Web quel type d'erreur s'est produit dans la tâche exécutée à quelle heure et quelle minute, et quel type d'erreur standard a été généré. Il y a une grande différence par rapport à l'implémentation qui continue de cracher des fichiers journaux qui ne sont même pas tournés par cron. Je pense que c'est merveilleux qu'une erreur puisse se produire et que le fichier journal soit recherché, et par conséquent, la sortie du journal est si douce que la cause ne peut pas être identifiée.
■ 3. Peut configurer l'arborescence des travaux Il peut être clairement défini comme une exécution de tâche B une fois qu'une tâche est terminée.
■ 4. Les changements d'arborescence et les changements d'heure d'exécution peuvent être enregistrés dans git Puisque le planning et l'arborescence seront programmés avec python, si vous le gérez avec git, l'historique des modifications restera.
Airflow ne peut pas exécuter manuellement les tâches. Puisque la direction que nous visons en tant que produit est différente de jenkins, je pense qu'Airflow n'ose pas l'implémenter. </ del> (Dans Airflow, il était possible d'exécuter manuellement des tâches en introduisant CeleryExecutor. Voir les problèmes ici pour savoir pourquoi CeleryExecutor est requis issues / 51)) Dans Airflow, même la définition du nom de la balise est définie dans la commande python. Vous ne pouvez surveiller l'état d'exécution qu'à partir de WebGUI et vous ne pouvez pas du tout modifier le comportement de la tâche. Je pense que je fais ça aussi. Airflow est un produit pointu, donc si vous ne comprenez pas ce domaine et essayez de l'utiliser comme un produit alternatif à Jenkins, il est possible que vous ne puissiez pas l'utiliser.
C'est une tâche entièrement automatisée, et c'est comme là où l'erreur s'est produite dans la tâche uniquement lorsque l'échec s'est produit au cours du premier mois sans être conscient de son existence. J'ai pensé qu'il convenait à de telles applications.
Dans mon environnement local où mysql est en cours d'exécution, j'ai pu l'installer et le démarrer en 10 minutes et vérifier le fonctionnement avec un navigateur.
install Official Readme.rst J'ai installé en lisant le fichier.
mkvirtualenv airflow
mkdir ~/airflow
cd ~/airflow/
pip install airflow[mysql]
export AIRFLOW_HOME=~/airflow
airflow initdb
run
airflow webserver -p 8080
Accédez à [http: // localhost: 8080 /](http: // localhost: 8080 / admin /) avec un navigateur
mkdir ~/airflow/dags
touch ~/airflow/dags/__init__.py
touch ~/airflow/dags/export_db.py
# export_db.Ecrire une définition de tâche dans py
python ~/airflow/dags/export_db.py
airflow list_dags
airflow list_tasks export_db
airflow list_tasks export_db --tree
airflow scheduler