When we built our data infrastructure, we adopted Airflow as our data pipeline. At that time, there were some stumbling points, so I will write them down.
We develop and operate multiple systems that use machine learning. As the number of projects has increased and operations have progressed, it has become necessary to meet the following requirements in common.
--You can access multiple required data sources with one endpoint --The same query returns the same result at any time --Query does not get stuck
Therefore, we decided that the data infrastructure was a necessary phase and decided to build it. When building a data infrastructure, the original data needs to be processed for a data warehouse or data mart.
At that time, the data pipeline had to meet the following requirements.
--If the logic is the same, the data that is finally created will be the same even in situations such as suspending and resuming due to an error, or hitting again from the beginning with the data created. --Data processing is performed every day ――You can immediately notice that the process has failed, and it is clear where to restart.
We have adopted Airflow as a tool that seems to meet these requirements.
It is the lower part in the conceptual diagram.
Aiflow has implemented it to meet the above requirements.
--Do not move the current TaskInstance until the previous TaskInstance is completed. --Skip the task if the data you want to create has already been created --After processing the past minutes, execute at the date and time --Notify if TaskInstance for this time succeeds or fails
The version of Airflow is 1.10.5
.
default_args = {
'owner': 'Airflow',
'start_date': datetime(2019, 12, 1),
}
dag = DAG('tutorial1', default_args=default_args,
schedule_interval=timedelta(days=1))
This code starts with ʻexecution_date from
2019-12-01T 00: 00: 00 + 00: 00 It is executed like 12/1, 12/2, 12/3 ..., and when the past execution is completed, it will be executed every day. At this time, assuming that today is
2019-12-06T01: 00: 00 + 00: 00 (UTC)`, how long will execution_date be executed?
The answer is that TaskInstances up to 2019-12-05T 00: 00: 00 + 00: 00 (UTC)
will be executed.
I misunderstood that when today's date is 12/6, execution_date will run until 12/6.
Below is an image diagram.
In addition to this, if there are requirements such as wanting to handle time in the time zone: ʻAsia / Tokyo` within the task Be careful as it can be confusing.
Because it was necessary to perform today's processing using the execution result of TaskInstance of the previous day
Today's TaskInstance had to wait for the previous day's success of TaskInstance.
Therefore, I used wait_for_downstream
to wait for the result of a specific task in the previous TaskInstance.
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
wait_for_downstream=True,
dag=dag)
However, wait_for_downstream
does not wait for the result of the previous entire TaskInstance.
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
wait_for_downstream=True,
dag=dag)
t2 = BashOperator(
task_id='print2',
bash_command='echo 2',
wait_for_downstream=True,
dag=dag)
t1 >> t2
If you write, the t1 task for this time will be executed when the t1 task for the previous time is completed (without waiting for the completion of t2). However, the t1 task for this time must wait for both the t1 and t2 tasks for the previous time. So I used ʻExternalTaskSensor` and set it to wait for the last task for the last time.
t_check_previous_dag_run = ExternalTaskSensor(
task_id='is_success_pre_dag_run',
external_dag_id=dag.dag_id,
allowed_states=['success', 'skipped'],
external_task_id='your_last_task_id',
execution_delta=timedelta(days=1)
)
#t1 is the first task you want to perform
t_check_previous_dag_run >> t1
However, TaskInstance (execution_date = start_date) that works first with this description alone Will continue to wait for the completion of tasks that do not exist and will not proceed.
Therefore, further
# is_initial is a function to determine if it is the first execution user_defined_Set and used in macros
t_check_is_initial = BranchPythonOperator(
task_id='is_initial',
python_callable=lambda is_initial_str: 'do_nothing' if is_initial_str == 'True' else 'is_success_pre_dag_run', # NOQA
op_args=['{{ is_initial(execution_date) }}']
)
t_do_nothing = DummyOperator(
task_id='do_nothing'
)
#trigger so as not to be skipped_rule='none_failed'The set
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
trigger_rule='none_failed',
dag=dag)
t_check_is_initial >> t_check_previous_dag_run >> t1
t_check_is_initial >> t_do_nothing >> t1
I wrote the code such as, and skipped ʻExternalTaskSensor` in the first execution to avoid it.
It's now redundant, but it's now clear to wait for the previous day's TaskInstance.
However, it is still redundant, so please teach if you know another way to wait for the execution result of the previous day.
ShortCircuitOperator
gives the subsequent tasks skip status to all
when the function declared with python_callable returns false.
Therefore, it is not possible to skip the task immediately after it, but want to execute the task further ahead.
In the above example, it is not possible to use the shortcircuit task (ShortCircuitOperator) to skip print2_2 to execute the finish task.
Also, in BranchPython Operator
, something similar happens when the trigger_rule of the subsequent task is set to the default ʻall_success`.
t0 = BranchPythonOperator(
task_id='first_and_branch',
python_callable=lambda: 'print1',
dag=dag)
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
dag=dag)
t2 = BashOperator(
task_id='print2',
bash_command='echo 2',
dag=dag)
t3 = BashOperator(
task_id='finish',
bash_command='echo finish',
dag=dag
)
t0 >> t1
t0 >> t2
t1 >> t3
t2 >> t3
If the trigger_rule of the finish task is ʻall_sucess`, it will be in skip status if any one of the parent tasks is in skip status.
If you want the finish task to be executed if none of the parent tasks have a fail status If you set trigger_rule to'none_failed' as shown below, it will work as expected.
t3 = BashOperator(
task_id='finish',
bash_command='echo finish',
trigger_rule='none_failed',
dag=dag
)
If the first_and_branch task part is ShortCircuitOperator
and the result of python_callable is false, all subsequent tasks will be in skip status regardless of trigger_rule.
Use default_args to send slack notifications if any task fails I wrote it as follows.
def send_slack():
#Process to issue slack notification
default_args = {
'start_date': datetime(2019, 12, 1),
'on_failure_callback': send_slack
}
However, in this way of writing, when the slack notification is not sent for some reason, that fact is not displayed on the management screen of airflow. As a result, I sometimes didn't notice that the slack notification task itself was broken. Therefore, by clearly stating that the slack notification will be sent at the end of the task as shown below, even if the slack notification itself has failed, it can be noticed by looking at the management screen.
t_finish = DummyOperator(
task_id='task_finish',
trigger_rule='none_failed',
)
#Use Operator to send your own Slack notifications
# trigger_Notifications are skipped regardless of success or failure by allocating tasks by rule
t_notification_on_success = CustomSlackOperator(
task_id='notification_on_success',
trigger_rule='none_failed'
)
t_notification_on_failed = CustomSlackOperator(
task_id='notification_on_failed',
is_success=False,
trigger_rule='one_failed'
)
t_finish >> t_notification_on_success
t_finish >> t_notification_on_failed
Since it is possible that the notification itself will fail due to the setting on the slack side being changed unknowingly. I think it is safer to specify the notification task as well.
I stumbled on other details, Most of the patterns could be solved by reading the documentation carefully (although I sometimes read the source code).
Airflow --Dag can be defined flexibly with python code --The past works like a batch, and after that, it works like a regular execution (difficult to express ...)
It is one of the few tools that can do such things. Therefore, I think it is a good candidate tool when running multiple complex tasks on a regular basis.
Recommended Posts