Translation of Workflow definition of Digdag official website document + α The final goal is to create a batch in Rails using Digdag's Ruby http://docs.digdag.io/workflow_definition.html
Getting started Architecture Concepts Workflow definition Scheduling workflow Operators Command reference Language API -Ruby Change the setting value for each environment with Digdag (RubyOnRails) Batch implementation in RubyOnRails environment using Digdag
Workflow definition *Workflow definition: .dig files The digdag workflow is defined in a file named with the `` `.dig extension ```. The name of the file is the name of the workflow.
For example, creating a hello_world
workflow creates a
hello_world.dig``` file. The contents of the file are as follows:
hello_world.dig
timezone: Asia/Tokyo
+step1:
sh>: tasks/shell_sample.sh
+step2:
py>: tasks.MyWorkflow.step2
param1: this is param1
+step3:
rb>: MyWorkflow.step3
require: tasks/ruby_sample.rb
timezone
The parameters are used to configure the workflow time zone and affect the session timestamp variables and scheduling. The default time zone isutc
is. Examples of other valid time zones are america/los_angeles、europe/berlin、asia/There are tokyo and so on.
Key names that start with the + sign are tasks. The tasks are executed in order from the top. A task can be nested as a child of another task. In the above example, + step2
is executed after
+ step1 as a child of the` `+ main
task.
operators> A task with the ** type>: command ** or ** _ type: NAME ** parameter performs the action. You can choose from different types of operators, such as running shell scripts, Python methods, and sending emails. See the Operators page for a list of built-in operators.
foo>:bar is the same as the following two parameters.
_type: foo
_command: bar
Using ${variables} Workflows can embed variables using the $ {...} syntax. You can use built-in variables or define your own variables. See the following documentation for built-in variables http://docs.digdag.io/workflow_definition.html
Calculating variables You can calculate variables using basic JavaScript scripts with $ {...} syntax. A common use case is to format the timestamp in another format. Digdag comes bundled with Moment.js for time calculation.
python
timezone: Asia/Tokyo
+format_session_time:
echo>: ${moment(session_time).format("YYYY-MM-DD HH:mm:ss Z")}
+format_in_utc:
echo>: ${moment(session_time).utc().format("YYYY-MM-DD HH:mm:ss")}
+format_tomorrow:
echo>: ${moment(session_time).add(1, 'days').format("LLL")}
+get_execution_time:
echo>: ${moment().format("YYYY-MM-DD HH:mm:ss Z")}
python
2020-07-10 22:05:34 +0900 [INFO](0017@[0:default]+my_workflow+format_session_time): echo>: 2020-07-10 09:00:00 +09:00
2020-07-10 09:00:00 +09:00
2020-07-10 22:05:35 +0900 [INFO](0017@[0:default]+my_workflow+format_in_utc): echo>: 2020-07-10 00:00:00
2020-07-10 00:00:00
2020-07-10 22:05:35 +0900 [INFO](0017@[0:default]+my_workflow+format_tomorrow): echo>: July 11, 2020 9:00 AM
July 11, 2020 9:00 AM
2020-07-10 22:05:36 +0900 [INFO](0017@[0:default]+my_workflow+get_execution_time): echo>: 2020-07-10 22:05:36 +09:00
2020-07-10 22:05:36 +09:00
Defining variables Variable definition method
1.In YAML_Use of export parameter
2.Set variables programmatically using API
3.Start a session with a variable
Using _export: parameter In a YAML file, the ``` _export:` `` directive defines the variable. This is useful for loading static configurations such as database hostnames.
If the task has a _export instructor, the task and its children can use the variable to define the variable within scope.
my_workflow.dig
_export:
foo: 1
+prepare:
sh>: echo foo:${foo} bar:${bar}
+analyze:
_export:
bar: 2
+step1:
sh>: echo foo:${foo} bar:${bar}
+dump:
sh>: echo foo:${foo} bar:${bar}
result
2020-07-10 22:18:11 +0900 [INFO](0017@[0:default]+my_workflow+prepare): sh>: echo foo:1
foo:1
2020-07-10 22:18:11 +0900 [INFO](0017@[0:default]+my_workflow+analyze+step1): sh>: echo foo:1 bar:2
foo:1 bar:2
2020-07-10 22:18:12 +0900 [INFO](0017@[0:default]+my_workflow+dump): sh>: echo foo:1
foo:1
You can use foo = 1 for all tasks, but only + step1 (and + analyze) can use bar = 2.
Using API You can set variables programmatically using the language API. For example, the Python API provides digdag.env.export and digdag.env.store. I would like to focus on Ruby API for this content. The API will be dealt with in a separate chapter, so I won't explain it in detail here either.
import digdag
class MyWorkflow(object):
def prepare(self):
digdag.env.store({"my_param": 2})
def analyze(self, my_var):
print("my_var should be 2: %d" % my_var)
Starting a session with variables
You can set variables when you start a new workflow session. Use -p KEY = VALUE
multiple times to set a variable.
There is no sample in the documentation. Create a workflow that simply outputs the parameters received from the outside to the consol.
my_workflow.dig
+print_my_var1:
sh>: echo my_var1:${my_var1}
+print_my_var2:
sh>: echo my_var2:${my_var2}
my_var1 and my_bar2 can be obtained from the run-time parameters. Let's execute as follows ~
digdag run my_workflow.dig --rerun -p my_var1=foo -p my_var2=bar
result
2020-07-11 10:29:33 +0900 [INFO](0017@[0:default]+my_workflow+print_my_var1): sh>: echo my_var1:foo
my_var1:foo
2020-07-11 10:29:33 +0900 [INFO](0017@[0:default]+my_workflow+print_my_var2): sh>: echo my_var2:bar
my_var2:bar
!include another file
You can split your YAML file into smaller files to organize complex workflows.
!include
You can use to include split workflows in your workflow.
my_workflow.dig
+task1:
!include : 'tasks/task1.dig'
+task2:
!include : 'tasks/task2.dig'
task1.dig
+task1:
sh>:echo Task 1
task2.dig
+task2:
sh>:echo Task 2
my_workflow.When you run dig, task1.dig task2.You can see that the dig task is performed.
#### **`run`**
```rb
$digdag run my_workflow.dig --rerun
2020-07-11 10:40:39 +0900 [INFO](0017@[0:default]+my_workflow+task1+task1): sh>:echo Task 1
Task 1
2020-07-11 10:40:40 +0900 [INFO](0017@[0:default]+my_workflow+task2+task2): sh>:echo Task 2
Task 2
Parallel execution
If true is set for a group, the tasks in the group will run in parallel.
#### **`my_workflow.dig`**
```rb
+prepare:
# +data1, +data2, and +data3 run in parallel.
_parallel: true
+data1:
sh>: echo data1
+data2:
sh>: echo data2
+data3:
sh>: echo data3
+analyze:
sh>: echo analyze
data1, data2, data3 are output in parallel.
Execution result
$ digdag run my_workflow.dig --rerun
2020-07-11 10:50:45 +0900 [INFO](0018@[0:default]+my_workflow+prepare+data2): sh>: echo data2
2020-07-11 10:50:45 +0900 [INFO](0017@[0:default]+my_workflow+prepare+data1): sh>: echo data1
2020-07-11 10:50:45 +0900 [INFO](0019@[0:default]+my_workflow+prepare+data3): sh>: echo data3
data1
data2
data3
2020-07-11 10:50:45 +0900 [INFO](0019@[0:default]+my_workflow+analyze): sh>: echo analyze
analyze
_background:true
If is set to a task or group, the task or group runs in parallel with the previous task. The next task waits for the background task or group to complete.
my_workflow.dig
+prepare:
+data1:
sh>: echo data1
# +data1 and +data2 run in parallel.
+data2:
_background: true
sh>: echo data2
# +data3 runs after +data1 and +data2.
+data3:
sh>: echo data3
+analyze:
sh>: echo analyze
data1 and data2 are executed in parallel.
result
$ digdag run my_workflow.dig --rerun
2020-07-11 11:00:06 +0900 [INFO](0018@[0:default]+my_workflow+prepare+data2): sh>: echo data2
2020-07-11 11:00:06 +0900 [INFO](0017@[0:default]+my_workflow+prepare+data1): sh>: echo data1
data2
data1
2020-07-11 11:00:06 +0900 [INFO](0018@[0:default]+my_workflow+prepare+data3): sh>: echo data3
data3
2020-07-11 11:00:06 +0900 [INFO](0018@[0:default]+my_workflow+analyze): sh>: echo analyze
Retrying failed tasks automatically _retry: If the N (where N is an integer: 1, 2, 3, ...) parameter is set for a group, the group will be retried from the beginning if one or more child tasks fail.
my_workflow.dig
+prepare:
# If +erase_table, +load_data, or +check_loaded_data fail, it retries from
# +erase_table again.
_retry: 3
+erase_table:
sh>: echo erase_table
+load_data:
sh>: echo load_data
+check_loaded_data:
sh>: tasks/error.sh
+analyze:
sh>: echo analyze
error.sh
#!/bin/bash
exit 0
There were 4 errors in the first run and 3 Retry. If it is executed normally in the middle of Retry, the number of errors will decrease.
result
$ digdag run my_workflow.dig --rerun
2020-07-11 11:21:21 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:21 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:22 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:22 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:22 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:22 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:23 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:23 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:23 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:23 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:24 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:24 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:25 +0900 [INFO](0017@[0:default]+my_workflow^failure-alert): type: notify
error:
* +my_workflow+prepare+check_loaded_data:
Command failed with code 1 (runtime)
* +my_workflow+prepare+check_loaded_data:
Command failed with code 1 (runtime)
* +my_workflow+prepare+check_loaded_data:
Command failed with code 1 (runtime)
* +my_workflow+prepare+check_loaded_data:
Command failed with code 1 (runtime)
You can also set the interval to _retry as follows:
+prepare:
_retry:
limit: 3
interval: 10
interval_type: exponential
■ limit: Number of retries
■ interval: Re-execution interval time (seconds)
■ interval_type: If `constant or exponential``` ``` constant```, the re-execution interval will be constant. In the case of ```exponential```, the retry interval is set to
2 x (retry_count-1)
`` and increases with each retry. In the above example, the first retry interval is 10 seconds, the second is 20 seconds, and the third is 40 seconds.
Sending error notification
#Executed when workflow fails
_error:
sh>: tasks/runs_when_workflow_failed.sh
+analyze:
sh>: tasks/analyze_prepared_data_sets.sh
Email can be sent using mail> operator
at the timing of the error
Detailed explanation will be given in the operator's explanation ~
http://docs.digdag.io/operators/mail.html
Recommended Posts