[Ruby] Learn Digdag from Digdag Official Documentation-Ar Workflow definition

7 minute read

Target

Translation of the workflow definition of the document on the official website of Digdag + α The final goal is to make a batch in Rails using Digdag’s Ruby http://docs.digdag.io/workflow_definition.html

table of contents

Getting started Architecture Concepts Workflow definition Scheduling workflow Operators Command reference Language API -Ruby Change the setting value for each environment with Digdag (RubyOnRails) Batch implementation in RubyOnRails environment using Digdag

Workflow definition

Workflow definition: *.dig files

The digdag workflow is defined in a file whose name has a

.

dig extension
#### **`.`**

The name of the file is the name of the workflow.

For example, if you create a hello_world workflow, a ```hello_world.dig

``

``` file will be created. the contents of the file will look like this

hello_world.dig


timezone: Asia/Tokyo

+step1:
  sh>: tasks/shell_sample.sh

+step2:
  py>: tasks.MyWorkflow.step2
  param1: this is param1

+step3:
  rb>: MyWorkflow.step3
  require: tasks/ruby_sample.rb

The timezone parameter is used to configure the workflow timezone and affects the session timestamp variable and scheduling. The default time zone is ```UTC

.

 Examples of other valid time zones include America/Los_Angeles, Europe/Berlin, Asia/Tokyo.

### ** The + symbol means a task **
Key names that start with a + sign are tasks. The tasks are executed sequentially from the top. A task can be nested as a child of another task. In the above example, ```+step2``` will be executed after ```+step1``` as a child of the ```+main
#### **` task.`**

operators>

The task with the type>: command or _type: NAME parameter executes the action. You can choose different types of operators, such as shell script execution, Python methods, and email sending. See the operators page for a list of built-in operators.

foo>:bar is the same as the following two parameters.

_type: foo
_command: bar

Using ${variables}

Workflows can embed variables using the $ {…} syntax. You can use the built-in variables or define your own. See the following document for built-in variables http://docs.digdag.io/workflow_definition.html

Calculating variables

You can calculate variables using basic JavaScript script with ${…} syntax. A common use case is to format a timestamp into another format. Moment.js for time calculation is bundled with Digdag.

``


timezone: Asia/Tokyo

+format_session_time:
  echo>: ${moment(session_time).format("YYYY-MM-DD HH:mm:ss Z")}

+format_in_utc:
  echo>: ${moment(session_time).utc().format("YYYY-MM-DD HH:mm:ss")}

+format_tomorrow:
  echo>: ${moment(session_time).add(1,'days').format("LLL")}

+get_execution_time:
  echo>: ${moment().format("YYYY-MM-DD HH:mm:ss Z")}

``


2020-07-10 22:05:34 +0900 [INFO](0017@[0:default]+my_workflow+format_session_time): echo>: 2020-07-10 09:00:00 +09:00
2020-07-10 09:00:00 +09:00
2020-07-10 22:05:35 +0900 [INFO](0017@[0:default]+my_workflow+format_in_utc): echo>: 2020-07-10 00:00:00
2020-07-10 00:00:00
2020-07-10 22:05:35 +0900 [INFO](0017@[0:default]+my_workflow+format_tomorrow): echo>: July 11, 2020 9:00 AM
July 11, 2020 9:00 AM
2020-07-10 22:05:36 +0900 [INFO](0017@[0:default]+my_workflow+get_execution_time): echo>: 2020-07-10 22:05:36 +09:00
2020-07-10 22:05:36 +09:00

Defining variables

Variable definition method

1. Using the _export parameter in YAML
2. Set variables programmatically using API
3. Start a session with variables

Using _export: parameter

In the YAML file, the ```__export:

directive defines the variables.


This is useful for loading static configurations such as database host names.

If the task has an _export directive, the task and its children can use the variable to define the variable in scope.


#### **`my_workflow.dig`**
```rb

_export:
  foo: 1

+prepare:
  sh>: echo foo:${foo} bar:${bar}

+analyze:
  _export:
    bar: 2

  +step1:
    sh>: echo foo:${foo} bar:${bar}

+dump:
  sh>: echo foo:${foo} bar:${bar}

``

 result
2020-07-10 22:18:11 +0900 [INFO](0017@[0:default]+my_workflow+prepare): sh>: echo foo:1
foo:1
2020-07-10 22:18:11 +0900 [INFO](0017@[0:default]+my_workflow+analyze+step1): sh>: echo foo:1 bar:2
foo:1 bar:2
2020-07-10 22:18:12 +0900 [INFO](0017@[0:default]+my_workflow+dump): sh>: echo foo:1
foo:1

You can use foo=1 for all tasks, but only +step1 (and +analyze) can use bar=2.

Using API

You can set variables programmatically using the language API. For example, the Python API provides digdag.env.export and digdag.env.store. I would like to focus on Ruby API for this content. API will be covered in another chapter, so I will not explain it in detail here.

import digdag

class MyWorkflow(object):
  def prepare(self):
    digdag.env.store({"my_param": 2})

  def analyze(self, my_var):
    print("my_var should be 2: %d" %my_var)

Starting a session with variables

You can set variables when starting a new workflow session. To set the variable, use ```-p KEY = VALUE

multiple times.


There are no samples in the documentation. Create a workflow that simply outputs the parameters received from the outside to the consol.


#### **`my_workflow.dig`**
```rb

+print_my_var1:
  sh>: echo my_var1:${my_var1}

+print_my_var2:
  sh>: echo my_var2:${my_var2}

You can get my_var1 and my_bar2 from the run-time parameters. Let’s run it as follows ~

digdag run my_workflow.dig --rerun -p my_var1=foo -p my_var2=bar

``

 result
2020-07-11 10:29:33 +0900 [INFO](0017@[0:default]+my_workflow+print_my_var1): sh>: echo my_var1:foo
my_var1:foo
2020-07-11 10:29:33 +0900 [INFO](0017@[0:default]+my_workflow+print_my_var2): sh>: echo my_var2:bar
my_var2:bar
```## **!include another file**
You can split your YAML file into smaller files to organize complex workflows.
You can include a split workflow in your workflow using ```!include
#### **`.`**

my_workflow.dig


+task1:
    !include :'tasks/task1.dig'
+task2:
    !include :'tasks/task2.dig'

task1.dig


+task1:
    sh>: echo Task 1

task2.dig


+task2:
    sh>: echo task 2

If you execute my_workflow.dig, you can see that the task ```task1.dig task2.dig

is executed.




#### **`run`**
```rb

$digdag run my_workflow.dig --rerun
2020-07-11 10:40:39 +0900 [INFO]([email protected][0:default]+my_workflow+task1+task1): sh>: echo Task 1
Task 1
2020-07-11 10:40:40 +0900 [INFO]([email protected][0:default]+my_workflow+task2+task2): sh>: echo Task 2
Task 2

Parallel execution

If ```_parallel:true

is set for a group, the tasks within the group will run in parallel.




#### **`my_workflow.dig`**
```rb

+prepare:
  # +data1, +data2, and +data3 run in parallel.
  _parallel: true

  +data1:
    sh>: echo data1
  +data2:
    sh>: echo data2
  +data3:
    sh>: echo data3
+analyze:
  sh>: echo analyze

data1, data2, data3 are output in parallel.

``

 execution result
$ digdag run my_workflow.dig --rerun
2020-07-11 10:50:45 +0900 [INFO](0018@[0:default]+my_workflow+prepare+data2): sh>: echo data2
2020-07-11 10:50:45 +0900 [INFO](0017@[0:default]+my_workflow+prepare+data1): sh>: echo data1
2020-07-11 10:50:45 +0900 [INFO](0019@[0:default]+my_workflow+prepare+data3): sh>: echo data3
data1
data2
data3
2020-07-11 10:50:45 +0900 [INFO](0019@[0:default]+my_workflow+analyze): sh>: echo analyze
analyze

If ```_background: true

is set on a task or group, the task or group will run in parallel with the previous task. The next task waits for the background task or group to complete.




#### **`my_workflow.dig`**
```rb

+prepare:
  +data1:
    sh>: echo data1

  # +data1 and +data2 run in parallel.
  +data2:
    _background: true
    sh>: echo data2

  # +data3 runs after +data1 and +data2.
  +data3:
    sh>: echo data3

+analyze:
  sh>: echo analyze

data1 and data2 are executed in parallel.

``

 result
$ digdag run my_workflow.dig --rerun

2020-07-11 11:00:06 +0900 [INFO](0018@[0:default]+my_workflow+prepare+data2): sh>: echo data2
2020-07-11 11:00:06 +0900 [INFO](0017@[0:default]+my_workflow+prepare+data1): sh>: echo data1
data2
data1
2020-07-11 11:00:06 +0900 [INFO](0018@[0:default]+my_workflow+prepare+data3): sh>: echo data3
data3
2020-07-11 11:00:06 +0900 [INFO](0018@[0:default]+my_workflow+analyze): sh>: echo analyze

Retrying failed tasks automatically

If the _retry: N (N is an integer: 1, 2, 3, …) parameter is set for a group, the group is retried from the beginning when one or more child tasks fail.

my_workflow.dig


+prepare:
  # If +erase_table, +load_data, or +check_loaded_data fail, it retries from
  # +erase_table again.
  _retry: 3

  +erase_table:
    sh>: echo erase_table

  +load_data:
    sh>: echo load_data
  +check_loaded_data:
    sh>: tasks/error.sh

+analyze:
  sh>: echo analyze

error.sh


#!/bin/bash
exit 0

The first run and Retry got three errors, four. I think the number of errors will decrease if it is executed normally during Retry.

``

 result
$ digdag run my_workflow.dig --rerun
2020-07-11 11:21:21 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:21 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:22 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:22 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:22 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:22 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_dataload_data
2020-07-11 11:21:23 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:23 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:23 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:23 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:24 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:24 +0900 [INFO](0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:24 +0900 [ERROR](0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:25 +0900 [INFO](0017@[0:default]+my_workflow^failure-alert): type: notify
error: 
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)

次のように、間隔を_retryに設定もできます

+prepare:
  _retry:
    limit: 3
    interval: 10
    interval_type: exponential

■limit: 再試行回数 ■interval:再実行間隔時間(秒) ■interval_type: constantかexponentialconstant の場合は再実行間隔は一定になります。 exponentialの場合は再実行間隔は2 x(retry_count-1) として再試行ごとに増加します。上記の例では最初の再試行間隔は10秒、2番目は20秒、3番目は40秒です。

Sending error notification

# ワークフローが失敗する時に実行される
_error:
  sh>: tasks/runs_when_workflow_failed.sh

+analyze:
  sh>: tasks/analyze_prepared_data_sets.sh

エラーのタイミングでmail>operatorを利用してメール送信可能 詳しい説明はオペレーターの説明でやります〜 http://docs.digdag.io/operators/mail.html