When Airflow is introduced, an error occurs in the batch processing of cron, and as a result of catching the log file, it becomes possible to prevent such things as the log output is sweet and the cause cannot be identified.
Do you know Airflow?
Airbnb's open source data pipeline scheduling and monitoring tool. Simply put, a high-performance cron that can build a job tree. It is open source software developed in Python2 series and pip installable. At the large-scale event re: Invent 2015 held by AWS in 1 year, it was announced that multiple companies are using Airflow and attracted attention. I was interested in reading Yahoo Announcement. This article is a memo that examined and verified whether Airflow should be introduced into the project.
■ I tried to put the analysis task of the project on Airflow
There were few Japanese materials, so at first I didn't know what Airflow could do. I'd like to postpone the installation method and supplement the information on how to use it first. After examining it for about a week, looking back, I think that the description written in the first line of the Airflow repository is the most appropriate representation of Airflow.
Airflow is a system to programmatically author, schedule and monitor data pipelines.(Super translation:Airflow is a system that provides the following functions by programming. Example:Data pipeline schedule, monitoring, etc.)
If Airflow is used for purposes other than schedule and monitoring, for example, if it is used for writing data operation commands or manually executing tasks, it will become a difficult system to use immediately, so it is important to separate the usage before introduction. is.
I wrote an analysis task in Airflow. The reason why the data is placed in S3 once is that if the processing fails in the middle, the acquisition time will shift if it is reacquired from the DB, and I do not want to hit the dump command many times a day.
■ Specifications After dumping the necessary data from MySQL once a day and saving it in S3, copy it to Google Cloud Storage and input the data to BigQuery. Send an email to the people concerned when the data is installed in Google Cloud Storage.
■ Break down specifications into tasks
■ What you should not do in the design of Airflow As recommended by jenkins, job detail definitions should be aggregated in each shell or command. If you write the gorigori business logic on the Airflow side, it will be difficult to manage updates and reflect the difference. The processing that should be programmed on the Airflow side should be focused on the flow and schedule.
■ Airflow tag implementation example We will program the specifications with Airflow. Write logic in export_db.py (I thought that the implementation that would not be recognized as an Airflow task if even one Japanese comment was written was really crap.)
export_db.py
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='export_db', default_args=args)
CMD_BASE_DIR = '~/analyze/{}'
# cmd file name
EXPORT_DB_TO_S3_CMD = 'export_db_to_s3.sh'
COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD = 'copy_s3_to_google_storage.py'
IMPORT_BIG_QUERY_CMD = 'import_big_query.py'
SEND_MAIL_CMD = 'send_mail.py'
def do_cmd(cmd):
os.system(cmd)
# define task
# 1. db to s3
task1 = PythonOperator(
task_id='1.' + EXPORT_DB_TO_S3_CMD,
python_callable=do_cmd,
provide_context=True,
op_kwargs={'cmd': CMD_BASE_DIR.format(EXPORT_DB_TO_S3_CMD)},
dag=dag)
# 2. s3 to cloud storage
task2 = PythonOperator(
task_id='2.' + COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD,
python_callable=do_cmd,
op_kwargs={'cmd': CMD_BASE_DIR.format(COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD)},
dag=dag)
# 3. import bq
task3 = PythonOperator(
task_id='3.' + IMPORT_BIG_QUERY_CMD,
python_callable=do_cmd,
op_kwargs={'cmd': CMD_BASE_DIR.format(IMPORT_BIG_QUERY_CMD)},
dag=dag)
# 4. send mail
task4 = PythonOperator(
task_id='4.' + SEND_MAIL_CMD,
python_callable=do_cmd,
op_kwargs={'cmd': CMD_BASE_DIR.format(SEND_MAIL_CMD)},
dag=dag)
# define task stream
# 1 >> 2 >> 3 and 2 >> 4
task1.set_downstream(task2)
task2.set_downstream(task3)
task2.set_downstream(task4)
# define start task
run_this = task1
■ Test if the task works
python ~/airflow/dags/export_db.py
■ Restart Airflow to reflect the task I think the point that Airflow needs to be restarted to reflect the task is really shit. Due to this disadvantage, it is not recommended to write business logic in the task.
■ Confirm that the task is registered with the browser By default, export_db is registered in [http: // localhost: 8080 / admin /](http: // localhost: 8080 / admin /) on the top page.
■ Schedule execution To run according to the schedule defined in python, start with ʻairflow scheduler`. In this case, the start date and time is set to 7 days ago, so it will continue to move in an infinite loop endlessly.
■ Image 1. Graph view of the specified task
■ Image 2. Top DAG list
■ Image 3. Tree view of tasks
This is an introduction merit when compared with the operation with cron. It is a lot of subjectivity.
■ 1. Visualization of execution time for each task I think it's great to take a summary of the execution time and display it as a graph in a beautiful web view. It's also hard to get a summary if you batch execute with cron and spit out in the log.
■ 2. The error log is easy to see anyway You can see from the Web what kind of error occurred in the task executed at what time and minute, and what kind of standard error was output. There is a big difference from the implementation that keeps spitting out log files that are not even rotated by cron. I think it's wonderful that an error can occur and the log file is hunted, and as a result, the log output is so sweet that the cause cannot be identified, and things like stubbornness and pain can be prevented by the mechanism.
■ 3. Can configure job tree It can be clearly defined as B task execution after A task is completed.
■ 4. Tree changes and execution time changes can be recorded in git Since the schedule and tree will be programmed with python, if you manage it with git, the change history will remain.
Airflow cannot manually execute tasks. Since the direction we aim for as a product is different from jenkins, I think that Airflow does not dare to implement it. </ del> (In Airflow, it was possible to execute tasks manually by introducing CeleryExecutor. See issues here for why CeleryExecutor is required issues / 51)) In Airflow, even setting the tag name is all defined in the python command. You can only monitor the execution status from the WebGUI, and you cannot change the behavior of the task at all. I think I'm doing that too. Airflow is a sharp product, so if you misunderstand this area and try to operate it as a substitute product for jenkins, there is a possibility that you will not be able to use it.
It's a fully automated task, and it's like where the error occurred in the task only when the failure occurred in the first month without being aware of its existence. I thought that it was suitable for such applications.
In my local environment where mysql is running, I was able to install and start it in 10 minutes and check the operation with a browser.
install Official Readme.rst I installed while reading the file.
mkvirtualenv airflow
mkdir ~/airflow
cd ~/airflow/
pip install airflow[mysql]
export AIRFLOW_HOME=~/airflow
airflow initdb
run
airflow webserver -p 8080
Access [http: // localhost: 8080 /](http: // localhost: 8080 / admin /) with your browser
mkdir ~/airflow/dags
touch ~/airflow/dags/__init__.py
touch ~/airflow/dags/export_db.py
# export_db.Write task definition in py
python ~/airflow/dags/export_db.py
airflow list_dags
airflow list_tasks export_db
airflow list_tasks export_db --tree
airflow scheduler