Learn Digdag from Digdag official documentation-Ah Workflow definition

Target

Translation of Workflow definition of Digdag official website document + α The final goal is to create a batch in Rails using Digdag's Ruby http://docs.digdag.io/workflow_definition.html

#table of contents

Getting started Architecture Concepts Workflow definition Scheduling workflow Operators Command reference Language API -Ruby Change the setting value for each environment with Digdag (RubyOnRails) Batch implementation in RubyOnRails environment using Digdag

Workflow definition *Workflow definition: .dig files The digdag workflow is defined in a file named with the `` `.dig extension ```. The name of the file is the name of the workflow.

For example, creating a hello_world workflow creates a hello_world.dig``` file. The contents of the file are as follows:

hello_world.dig


timezone: Asia/Tokyo

+step1:
  sh>: tasks/shell_sample.sh

+step2:
  py>: tasks.MyWorkflow.step2
  param1: this is param1

+step3:
  rb>: MyWorkflow.step3
  require: tasks/ruby_sample.rb

timezoneThe parameters are used to configure the workflow time zone and affect the session timestamp variables and scheduling. The default time zone isutcis. Examples of other valid time zones are america/los_angeles、europe/berlin、asia/There are tokyo and so on.

** + sign means task **

Key names that start with the + sign are tasks. The tasks are executed in order from the top. A task can be nested as a child of another task. In the above example, + step2 is executed after + step1 as a child of the` `+ main task.

operators> A task with the ** type>: command ** or ** _ type: NAME ** parameter performs the action. You can choose from different types of operators, such as running shell scripts, Python methods, and sending emails. See the Operators page for a list of built-in operators.

foo>:bar is the same as the following two parameters.

_type: foo
_command: bar

Using ${variables} Workflows can embed variables using the $ {...} syntax. You can use built-in variables or define your own variables. See the following documentation for built-in variables http://docs.digdag.io/workflow_definition.html

Calculating variables You can calculate variables using basic JavaScript scripts with $ {...} syntax. A common use case is to format the timestamp in another format. Digdag comes bundled with Moment.js for time calculation.

python


timezone: Asia/Tokyo

+format_session_time:
  echo>: ${moment(session_time).format("YYYY-MM-DD HH:mm:ss Z")}

+format_in_utc:
  echo>: ${moment(session_time).utc().format("YYYY-MM-DD HH:mm:ss")}

+format_tomorrow:
  echo>: ${moment(session_time).add(1, 'days').format("LLL")}

+get_execution_time:
  echo>: ${moment().format("YYYY-MM-DD HH:mm:ss Z")}

python


2020-07-10 22:05:34 +0900 [INFO](0017@[0:default]+my_workflow+format_session_time): echo>: 2020-07-10 09:00:00 +09:00
2020-07-10 09:00:00 +09:00
2020-07-10 22:05:35 +0900 [INFO](0017@[0:default]+my_workflow+format_in_utc): echo>: 2020-07-10 00:00:00
2020-07-10 00:00:00
2020-07-10 22:05:35 +0900 [INFO](0017@[0:default]+my_workflow+format_tomorrow): echo>: July 11, 2020 9:00 AM
July 11, 2020 9:00 AM
2020-07-10 22:05:36 +0900 [INFO](0017@[0:default]+my_workflow+get_execution_time): echo>: 2020-07-10 22:05:36 +09:00
2020-07-10 22:05:36 +09:00

Defining variables Variable definition method

1.In YAML_Use of export parameter
2.Set variables programmatically using API
3.Start a session with a variable

Using _export: parameter In a YAML file, the ``` _export:` `` directive defines the variable. This is useful for loading static configurations such as database hostnames.

If the task has a _export instructor, the task and its children can use the variable to define the variable within scope.

my_workflow.dig


_export:
  foo: 1

+prepare:
  sh>: echo foo:${foo} bar:${bar}

+analyze:
  _export:
    bar: 2

  +step1:
    sh>: echo foo:${foo} bar:${bar}

+dump:
  sh>: echo foo:${foo} bar:${bar}

result


2020-07-10 22:18:11 +0900 [INFO](0017@[0:default]+my_workflow+prepare): sh>: echo foo:1
foo:1
2020-07-10 22:18:11 +0900 [INFO](0017@[0:default]+my_workflow+analyze+step1): sh>: echo foo:1 bar:2
foo:1 bar:2
2020-07-10 22:18:12 +0900 [INFO](0017@[0:default]+my_workflow+dump): sh>: echo foo:1
foo:1

You can use foo = 1 for all tasks, but only + step1 (and + analyze) can use bar = 2.

Using API You can set variables programmatically using the language API. For example, the Python API provides digdag.env.export and digdag.env.store. I would like to focus on Ruby API for this content. The API will be dealt with in a separate chapter, so I won't explain it in detail here either.

import digdag

class MyWorkflow(object):
  def prepare(self):
    digdag.env.store({"my_param": 2})

  def analyze(self, my_var):
    print("my_var should be 2: %d" % my_var)

Starting a session with variables You can set variables when you start a new workflow session. Use -p KEY = VALUE multiple times to set a variable. There is no sample in the documentation. Create a workflow that simply outputs the parameters received from the outside to the consol.

my_workflow.dig


+print_my_var1:
  sh>: echo my_var1:${my_var1}

+print_my_var2:
  sh>: echo my_var2:${my_var2}

my_var1 and my_bar2 can be obtained from the run-time parameters. Let's execute as follows ~

digdag run my_workflow.dig --rerun -p my_var1=foo -p my_var2=bar

result


2020-07-11 10:29:33 +0900 [INFO](0017@[0:default]+my_workflow+print_my_var1): sh>: echo my_var1:foo
my_var1:foo
2020-07-11 10:29:33 +0900 [INFO](0017@[0:default]+my_workflow+print_my_var2): sh>: echo my_var2:bar
my_var2:bar

!include another file You can split your YAML file into smaller files to organize complex workflows. !includeYou can use to include split workflows in your workflow.

my_workflow.dig


+task1:
    !include : 'tasks/task1.dig'
+task2:
    !include : 'tasks/task2.dig'

task1.dig


+task1:
    sh>:echo Task 1

task2.dig


+task2:
    sh>:echo Task 2

my_workflow.When you run dig, task1.dig task2.You can see that the dig task is performed.




#### **`run`**
```rb

$digdag run my_workflow.dig --rerun
2020-07-11 10:40:39 +0900 [INFO](0017@[0:default]+my_workflow+task1+task1): sh>:echo Task 1
Task 1
2020-07-11 10:40:40 +0900 [INFO](0017@[0:default]+my_workflow+task2+task2): sh>:echo Task 2
Task 2

Parallel execution

If true is set for a group, the tasks in the group will run in parallel.




#### **`my_workflow.dig`**
```rb

+prepare:
  # +data1, +data2, and +data3 run in parallel.
  _parallel: true

  +data1:
    sh>: echo data1
  +data2:
    sh>: echo data2
  +data3:
    sh>: echo data3
+analyze:
  sh>: echo analyze

data1, data2, data3 are output in parallel.

Execution result


$ digdag run my_workflow.dig --rerun
2020-07-11 10:50:45 +0900 [INFO](0018@[0:default]+my_workflow+prepare+data2): sh>: echo data2
2020-07-11 10:50:45 +0900 [INFO](0017@[0:default]+my_workflow+prepare+data1): sh>: echo data1
2020-07-11 10:50:45 +0900 [INFO](0019@[0:default]+my_workflow+prepare+data3): sh>: echo data3
data1
data2
data3
2020-07-11 10:50:45 +0900 [INFO](0019@[0:default]+my_workflow+analyze): sh>: echo analyze
analyze

_background:trueIf is set to a task or group, the task or group runs in parallel with the previous task. The next task waits for the background task or group to complete.

my_workflow.dig


+prepare:
  +data1:
    sh>: echo data1

  # +data1 and +data2 run in parallel.
  +data2:
    _background: true
    sh>: echo data2

  # +data3 runs after +data1 and +data2.
  +data3:
    sh>: echo data3

+analyze:
  sh>: echo analyze

data1 and data2 are executed in parallel.

result


$ digdag run my_workflow.dig --rerun

2020-07-11 11:00:06 +0900 [INFO](0018@[0:default]+my_workflow+prepare+data2): sh>: echo data2
2020-07-11 11:00:06 +0900 [INFO](0017@[0:default]+my_workflow+prepare+data1): sh>: echo data1
data2
data1
2020-07-11 11:00:06 +0900 [INFO](0018@[0:default]+my_workflow+prepare+data3): sh>: echo data3
data3
2020-07-11 11:00:06 +0900 [INFO](0018@[0:default]+my_workflow+analyze): sh>: echo analyze

Retrying failed tasks automatically _retry: If the N (where N is an integer: 1, 2, 3, ...) parameter is set for a group, the group will be retried from the beginning if one or more child tasks fail.

my_workflow.dig


+prepare:
  # If +erase_table, +load_data, or +check_loaded_data fail, it retries from
  # +erase_table again.
  _retry: 3

  +erase_table:
    sh>: echo erase_table

  +load_data:
    sh>: echo load_data
  +check_loaded_data:
    sh>: tasks/error.sh

+analyze:
  sh>: echo analyze

error.sh


#!/bin/bash
exit 0

There were 4 errors in the first run and 3 Retry. If it is executed normally in the middle of Retry, the number of errors will decrease.

result


$ digdag run my_workflow.dig --rerun
2020-07-11 11:21:21 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:21 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:22 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:22 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:22 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:22 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:23 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:23 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:23 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:23 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:24 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:24 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:25 +0900 [INFO](0017@[0:default]+my_workflow^failure-alert): type: notify
error: 
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)

You can also set the interval to _retry as follows:

+prepare:
  _retry:
    limit: 3
    interval: 10
    interval_type: exponential

■ limit: Number of retries ■ interval: Re-execution interval time (seconds) ■ interval_type: If `constant or exponential``` ``` constant```, the re-execution interval will be constant. In the case of ```exponential```, the retry interval is set to 2 x (retry_count-1) `` and increases with each retry. In the above example, the first retry interval is 10 seconds, the second is 20 seconds, and the third is 40 seconds.

Sending error notification

#Executed when workflow fails
_error:
  sh>: tasks/runs_when_workflow_failed.sh

+analyze:
  sh>: tasks/analyze_prepared_data_sets.sh

Email can be sent using mail> operator at the timing of the error Detailed explanation will be given in the operator's explanation ~ http://docs.digdag.io/operators/mail.html

Recommended Posts

Learn Digdag from Digdag official documentation-Ah Workflow definition
Learn Digdag from Digdag Official Documentation-Scheduling workflow
Learn Digdag from Digdag Official Documentation-Ah Concepts
Learn Digdag from Digdag official documentation-Operators ① Workflow control operators
Learn Digdag from Digdag Official Documentation-Architecture
Learn Digdag from Digdag Official Documentation-Getting started
Learn Digdag from Digdag official documentation-Language API-Ruby