What I stumbled upon using Airflow

Introduction

When we built our data infrastructure, we adopted Airflow as our data pipeline. At that time, there were some stumbling points, so I will write them down.

Airflow at our company

We develop and operate multiple systems that use machine learning. As the number of projects has increased and operations have progressed, it has become necessary to meet the following requirements in common.

--You can access multiple required data sources with one endpoint --The same query returns the same result at any time --Query does not get stuck

Therefore, we decided that the data infrastructure was a necessary phase and decided to build it. When building a data infrastructure, the original data needs to be processed for a data warehouse or data mart.

At that time, the data pipeline had to meet the following requirements.

--If the logic is the same, the data that is finally created will be the same even in situations such as suspending and resuming due to an error, or hitting again from the beginning with the data created. --Data processing is performed every day ――You can immediately notice that the process has failed, and it is clear where to restart.

We have adopted Airflow as a tool that seems to meet these requirements.

It is the lower part in the conceptual diagram. データ基盤概念図with_Airflow.png

Aiflow has implemented it to meet the above requirements.

--Do not move the current TaskInstance until the previous TaskInstance is completed. --Skip the task if the data you want to create has already been created --After processing the past minutes, execute at the date and time --Notify if TaskInstance for this time succeeds or fails

Premise

The version of Airflow is 1.10.5.

Stumbling 1. execution_date and execution date

default_args = {
    'owner': 'Airflow',
    'start_date': datetime(2019, 12, 1),
}
dag = DAG('tutorial1', default_args=default_args,
          schedule_interval=timedelta(days=1))

This code starts with ʻexecution_date from 2019-12-01T 00: 00: 00 + 00: 00 It is executed like 12/1, 12/2, 12/3 ..., and when the past execution is completed, it will be executed every day. At this time, assuming that today is2019-12-06T01: 00: 00 + 00: 00 (UTC)`, how long will execution_date be executed?

The answer is that TaskInstances up to 2019-12-05T 00: 00: 00 + 00: 00 (UTC) will be executed. I misunderstood that when today's date is 12/6, execution_date will run until 12/6. Below is an image diagram.

execution_date_flow.png

In addition to this, if there are requirements such as wanting to handle time in the time zone: ʻAsia / Tokyo` within the task Be careful as it can be confusing.

Stumble 2. Wait for the previous day's Task Instance

Because it was necessary to perform today's processing using the execution result of TaskInstance of the previous day Today's TaskInstance had to wait for the previous day's success of TaskInstance. Therefore, I used wait_for_downstream to wait for the result of a specific task in the previous TaskInstance.

t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    wait_for_downstream=True,
    dag=dag)

However, wait_for_downstream does not wait for the result of the previous entire TaskInstance.

t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    wait_for_downstream=True,
    dag=dag)

t2 = BashOperator(
    task_id='print2',
    bash_command='echo 2',
    wait_for_downstream=True,
    dag=dag)

t1 >> t2

If you write, the t1 task for this time will be executed when the t1 task for the previous time is completed (without waiting for the completion of t2). However, the t1 task for this time must wait for both the t1 and t2 tasks for the previous time. So I used ʻExternalTaskSensor` and set it to wait for the last task for the last time.

t_check_previous_dag_run = ExternalTaskSensor(
    task_id='is_success_pre_dag_run',
    external_dag_id=dag.dag_id,
    allowed_states=['success', 'skipped'],
    external_task_id='your_last_task_id',
    execution_delta=timedelta(days=1)
)

#t1 is the first task you want to perform
t_check_previous_dag_run >> t1

However, TaskInstance (execution_date = start_date) that works first with this description alone Will continue to wait for the completion of tasks that do not exist and will not proceed.

Therefore, further

# is_initial is a function to determine if it is the first execution user_defined_Set and used in macros
t_check_is_initial = BranchPythonOperator(
    task_id='is_initial',
    python_callable=lambda is_initial_str: 'do_nothing' if is_initial_str == 'True' else 'is_success_pre_dag_run',  # NOQA
    op_args=['{{ is_initial(execution_date) }}']
)

t_do_nothing = DummyOperator(
    task_id='do_nothing'
)

#trigger so as not to be skipped_rule='none_failed'The set
t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    trigger_rule='none_failed',
    dag=dag)

t_check_is_initial >> t_check_previous_dag_run >> t1
t_check_is_initial >> t_do_nothing >> t1

I wrote the code such as, and skipped ʻExternalTaskSensor` in the first execution to avoid it.

externaltasksensor.png

It's now redundant, but it's now clear to wait for the previous day's TaskInstance.

However, it is still redundant, so please teach if you know another way to wait for the execution result of the previous day.

Stumble 3. ShortCircuitOperator, Skip Status Rules

ShortCircuitOperator gives the subsequent tasks skip status to all when the function declared with python_callable returns false. Therefore, it is not possible to skip the task immediately after it, but want to execute the task further ahead.

スクリーンショット 2019-12-06 22.51.16.png

In the above example, it is not possible to use the shortcircuit task (ShortCircuitOperator) to skip print2_2 to execute the finish task.

Also, in BranchPython Operator, something similar happens when the trigger_rule of the subsequent task is set to the default ʻall_success`.

t0 = BranchPythonOperator(
    task_id='first_and_branch',
    python_callable=lambda: 'print1',
    dag=dag)

t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    dag=dag)

t2 = BashOperator(
    task_id='print2',
    bash_command='echo 2',
    dag=dag)

t3 = BashOperator(
    task_id='finish',
    bash_command='echo finish',
    dag=dag
)

t0 >> t1
t0 >> t2
t1 >> t3
t2 >> t3
スクリーンショット 2019-12-06 23.10.15.png

If the trigger_rule of the finish task is ʻall_sucess`, it will be in skip status if any one of the parent tasks is in skip status.

If you want the finish task to be executed if none of the parent tasks have a fail status If you set trigger_rule to'none_failed' as shown below, it will work as expected.

t3 = BashOperator(
    task_id='finish',
    bash_command='echo finish',
    trigger_rule='none_failed',
    dag=dag
)

If the first_and_branch task part is ShortCircuitOperator and the result of python_callable is false, all subsequent tasks will be in skip status regardless of trigger_rule.

Stumbling 4. Notification in case of failure

Use default_args to send slack notifications if any task fails I wrote it as follows.

def send_slack():
    #Process to issue slack notification

default_args = {
    'start_date': datetime(2019, 12, 1),
    'on_failure_callback': send_slack
}

However, in this way of writing, when the slack notification is not sent for some reason, that fact is not displayed on the management screen of airflow. As a result, I sometimes didn't notice that the slack notification task itself was broken. Therefore, by clearly stating that the slack notification will be sent at the end of the task as shown below, even if the slack notification itself has failed, it can be noticed by looking at the management screen.

t_finish = DummyOperator(
    task_id='task_finish',
    trigger_rule='none_failed',
)

#Use Operator to send your own Slack notifications
# trigger_Notifications are skipped regardless of success or failure by allocating tasks by rule
t_notification_on_success = CustomSlackOperator(
    task_id='notification_on_success',
    trigger_rule='none_failed'
)

t_notification_on_failed = CustomSlackOperator(
    task_id='notification_on_failed',
    is_success=False,
    trigger_rule='one_failed'
)

t_finish >> t_notification_on_success
t_finish >> t_notification_on_failed
スクリーンショット 2019-12-06 22.36.00.png

Since it is possible that the notification itself will fail due to the setting on the slack side being changed unknowingly. I think it is safer to specify the notification task as well.

Summary

I stumbled on other details, Most of the patterns could be solved by reading the documentation carefully (although I sometimes read the source code).

Airflow --Dag can be defined flexibly with python code --The past works like a batch, and after that, it works like a regular execution (difficult to express ...)

It is one of the few tools that can do such things. Therefore, I think it is a good candidate tool when running multiple complex tasks on a regular basis.

Recommended Posts

What I stumbled upon using Airflow
What I stumbled upon when using CodeIgniter on a Linux server
I stumbled upon trying Pylearn 2
I stumbled upon using MoviePy, so make a note
What I got into when using Tensorflow-gpu
I stumbled upon installing sentencepiece on ubuntu
[Linux] Let's talk about when I stumbled upon a symbolic link I was using.
I tried using parameterized
What I learned about AI / machine learning using Python (1)
I tried using argparse
I tried using mimesis
I tried using anytree
I tried using aiomysql
I tried using Summpy
I tried using coturn
I tried using Pipenv
I tried using matplotlib
I tried using "Anvil".
What I did when I stumbled on a Django tutorial
I tried using Hubot
I tried using ESPCN
I tried using openpyxl
I tried using Ipython
I tried using PyCaret
What I learned about AI / machine learning using Python (3)
I tried using cron
I tried using ngrok
I tried using face_recognition
I tried using Jupyter
I tried using PyCaret
I tried using Heapq
I tried using doctest
I tried using folium
I tried using jinja2
I tried using folium
What I was addicted to when using Python tornado
I tried using time-window
I stumbled upon PyUnicodeUCS4_FromStringAndSize when inserting TensorFlow with pip
What I learned about AI / machine learning using Python (2)
What I was careful about when implementing Airflow with docker-compose
What I learned about AI and machine learning using Python (4)
What has changed since I started using Visual Studio Code
What I was asked when using Random Forest in practice
I stumbled on TensorFlow (What is Out of GPU Memory)
[I tried using Pythonista 3] Introduction
I tried using easydict (memo).
I tried face recognition using Face ++
I tried using Random Forest
I tried using BigQuery ML
Where I stumbled on SQLite3
I tried using Amazon Glacier
I tried using git inspector
What I learned about Linux
[Python] I tried using OpenPose
I tried using magenta / TensorFlow
What I learned in Python
I tried using AWS Chalice
I tried using Slack emojinator
Scribble what I used when using ipython in formatting pos data
What I did when I stumbled upon mounting on MyDrive and loading an ipynb file in Google Colaboratory