[Ruby] Learn Digdag from the official Digdag documentation-Operators ① Workflow control operators

7 minute read

Target

Digdag official website documentation Operators translation + α The final goal is to make a batch in Rails using Digdag’s Ruby http://docs.digdag.io/operators/workflow_control.html

table of contents

Getting started Architecture Concepts Workflow definition Scheduling workflow Operators Command reference Language API -Ruby Change the setting value for each environment with Digdag (RubyOnRails) Batch implementation in RubyOnRails environment using Digdag

Operators

Workflow control operators

call>: Call another workflow

workfolw1.dig


timezone: Asia/Tokyo

+step1:
  call>: another_workflow.dig
+step2:
  call>: common/shared_workflow.dig

another_workflow.dig


+step1:
    sh>: echo hi! another_workflow.dig

/common/shared_workflow.dig


+step1:
    sh>: echo hi! ./common/shared_workflow.dig

result


$ digdag run workflow1.dig --rerun
2020-07-12 13:38:54 +0900 [INFO](0017@[0:default]+workflow1+step1): call>: another_workflow.dig
2020-07-12 13:38:54 +0900 [INFO](0017@[0:default]+workflow1+step1^sub+step1): sh>: echo hi! another_workflow.dig
hi! another_workflow.dig
2020-07-12 13:38:54 +0900 [INFO](0017@[0:default]+workflow1+step2): call>: common/shared_workflow.dig
2020-07-12 13:38:55 +0900 [INFO](0017@[0:default]+workflow1+step2^sub+step1): sh>: echo hi! ./common/shared_workflow.dig
hi! ./common/shared_workflow.dig

call>: FILE FILE contains the path to the workflow definition file. The file name must end with .dig. If the called workflow is in a subdirectory, the workflow will use the subdirectory as the working directory. Example) In the task, call>: common/called_workflow.dig is defined. When referencing the querys/data.sql file in the called workflow, refer to it in

.

./queries/data.sql
#### **`.`**
call>: another_workfloww.dig

http_call>: Call workflow fetched by HTTP

The http_call> operator makes an HTTP request, parses the response body as a workflow and embeds it as a subtask. Similar to the call> operator. The difference is that another workflow is fetched from HTTP.

This operator parses the response body based on the Content-Type header returned. Content-Type must be set and the following values are supported:

#### **` Parse the response as JSON.`**
#### **` Use the returned body as is.`**

If the proper Content-Type header is not returned, use the content_type_override option.

Options content_type_override: Overrides the Content-Type response header returned by the server. This option is useful if the server does not return the proper Content-Type but returns common values such as text/plain or ```application/octet-stream

.



http_call>: https://api.example.com/foobar content_type_override: application/x-yaml


### **require>: Depends on another workflow**
The ```require>
#### **` operator requests the completion of another workflow.`**

This operator is similar to the ```call>

operator, but this operator will not start another workflow if it is already running or if it is running at the same session time of this workflow ..


If the workflow is running or is newly started, this operator waits for the workflow to complete. In addition, the require operator can start the workflow of another project.


#### **`workflow1.dig`**
```rb

+step1:
  require>: another_workflow

another_workflow.dig


+step2:
  sh>: echo step2

execution result


$ digdag run workflow1.dig --rerun
2020-07-12 14:55:34 +0900 [INFO](0017@[0:default]+workflow1+step1): require>: another_workflow
2020-07-12 14:55:34 +0900 [INFO](0017@[0:default]+workflow1+step1): Starting a new session project id=1 workflow name=another_workflow session_time=2020-07-11T15:00 :00+00:00
2020-07-12 14:55:34 +0900 [INFO](0017@[0:default]+another_workflow+step2): sh>: echo step2
step2

Optionsproject_id: project_idproject_name: project_name You can start a workflow for another project by setting project_id or ```project_name

. If the project does not exist, the task will fail. If you set both project_id and project_name, the task fails.



```require>: another_project_wf
project_id: 12345
require>: another_project_wf
project_name: another_project

rerun_on: none, failed, all (default: none) If there are dependent workflow attempts, rerun_on controls whether ```require>

is actually started.


none: Does not start the workflow if a trial already exists.
failed: Starts the workflow if there are trials and the results are unsuccessful.
all: ```require>
#### **` Starts the workflow regardless of the result of the trial.`**

ignore_failure:BOOLEAN If the dependent workflow ends with an error by default, this operator will fail. However, if ignore_failure: true is set, this operator will succeed even if the workflow ends in error.

require>: another_workflow
ignore_failure: true

params:MAP This operator passes parameters to the workflow set to require. It is not passed to another workflow.

workflow1.dig


+step1:
  require>: another_workflow
  params:
    param_name1: hello

another_workflow


+step2:
  sh>: echo step2:${param_name1}

execution result


$ digdag run workflow1.dig --rerun
2020-07-12 15:19:34 +0900 [INFO](0017@[0:default]+workflow1+step1): require>: another_workflow
2020-07-12 15:19:34 +0900 [INFO](0017@[0:default]+workflow1+step1):Startinganewsessionprojectid=1workflowname=another_workflowsession_time=2020-07-11T15:00:00+00:002020-07-1215:19:34+0900[INFO](0017@[0:default]+another_workflow+step2): sh>: echo step2:hello
step2:hello

loop>: Repeat tasks

The loop> operator runs the subtask multiple times.

This operator exports the ${i} variable for subtasks. Its value starts from 0. For example, if count is 3, the task runs at i = 0, i = 1, and i = 2.

workflow1.dig


+repeat:
  loop>: 7
  _do:
    +step1:
      echo>: ${moment(session_time).add(i,'days')} is ${i} days later than ${session_date}
    +step2:
      echo>: ${moment(session_time).add(i,'hours')} is ${i} hours later than ${session_local_time}.

result


$ digdag run workflow1.dig --rerun

2020-07-12 16:19:04 +0900 [INFO](0017@[0:default]+workflow1+repeat): loop>: 7
2020-07-12 16:19:05 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-0+step1): echo>: "2020-07-11T15:00:00.000 Z" is 0 days later than 2020-07-11
"2020-07-11T15:00:00.000Z" is 0 days later than 2020-07-11
2020-07-12 16:19:06 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-0+step2): echo>: "2020-07-11T15:00:00.000 Z" is 0 hours later than 2020-07-11 15:00:00.
"2020-07-11T15:00:00.000Z" is 0 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:06 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-1+step1): echo>: "2020-07-12T15:00:00.000 Z" is 1 days later than 2020-07-11
"2020-07-12T15:00:00.000Z" is 1 days later than 2020-07-11
2020-07-12 16:19:07 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-1+step2): echo>: "2020-07-11T16:00:00.000 Z" is 1 hours later than 2020-07-11 15:00:00.
"2020-07-11T16:00:00.000Z" is 1 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:07 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-2+step1): echo>: "2020-07-13T15:00:00.000 Z" is 2 days later than 2020-07-11
"2020-07-13T15:00:00.000Z" is 2 days later than 2020-07-11
2020-07-12 16:19:07 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-2+step2): echo>: "2020-07-11T17:00:00.000 Z" is 2 hours later than 2020-07-11 15:00:00.
"2020-07-11T17:00:00.000Z" is 2 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:08 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-3+step1): echo>: "2020-07-14T15:00:00.000 Z" is 3 days later than 2020-07-11
"2020-07-14T15:00:00.000Z" is 3 days later than 2020-07-11
2020-07-12 16:19:08 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-3+step2): echo>: "2020-07-11T18:00:00.000 Z" is 3 hours later than 2020-07-11 15:00:00.
"2020-07-11T18:00:00.000Z" is 3 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:08 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-4+step1): echo>: "2020-07-15T15:00:00.000 Z" is 4 days later than 2020-07-11
"2020-07-15T15:00:00.000Z" is 4 days later than 2020-07-11
2020-07-12 16:19:09 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-4+step2): echo>: "2020-07-11T19:00:00.000 Z" is 4 hours later than 2020-07-11 15:00:00.
"2020-07-11T19:00:00.000Z" is 4 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:09 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-5+step1): echo>: "2020-07-16T15:00:00.000 Z" is 5 days later than 2020-07-11
"2020-07-16T15:00:00.000Z" is 5 days later than 2020-07-11
2020-07-12 16:19:09 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-5+step2): echo>: "2020-07-11T20:00:00.000 Z" is 5 hours later than 2020-07-11 15:00:00.
"2020-07-11T20:00:00.000Z" is 5 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:10 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-6+step1): echo>: "2020-07-17T15:00:00.000 Z" is 6 days later than 2020-07-11
"2020-07-17T15:00:00.000Z" is 6 days later than 2020-07-11
2020-07-12 16:19:10 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+loop-6+step2): echo>: "2020-07-11T21:00:00.000 Z" is 6 hours later than 2020-07-11 15:00:00.
"2020-07-11T21:00:00.000Z" is 6 hours later than 2020-07-11 15:00:00.

Options_parallel: BOOLEAN Run tasks in parallel

■ ```_do: TASKS
#### **` task executed in loop`**

for_each>: Repeat tasks for values

The for_each> operator uses a variable set to execute multiple subtasks

workflow1.rb


+repeat:
  for_each>:
    fruit: [apple, orange]
    verb: [eat, throw]
  _do:
    echo>: ${verb} ${fruit}

result


$ digdag run workflow1.dig --rerun
2020-07-12 16:27:00 +0900 [INFO](0017@[0:default]+workflow1+repeat): for_each>: {fruit=[apple, orange], verb=[eat, throw]}
2020-07-12 16:27:01 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+for-0=fruit=0=apple&1=verb=0=eat): echo>: eat apple
eat apple
2020-07-12 16:27:01 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+for-0=fruit=0=apple&1=verb=1=throw): echo>: throw apple
throw apple
2020-07-12 16:27:01 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+for-0=fruit=1=orange&1=verb=0=eat): echo>: eat orange
eat orange
2020-07-12 16:27:02 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+for-0=fruit=1=orange&1=verb=1=throw): echo>: throw orange
throw orange

Optionsfor_each>: VARIABLES Variables used in key loops: [value, value, …] syntax. Variables are objects or JSON strings.

Example 1


for_each>: {i: [1, 2, 3]}

```rb: Example 2for_each>: {i:’[1, 2, 3]’}

■ ```_parallel:BOOLEAN```
Run in parallel after iterative task

■ ```_do:TASKS```
The task to be performed

## **for_range>: Repeat tasks for a range**
The ```for_range>
#### **` operator uses a set of variables to execute a subtask multiple times.`**

This operator exports the subtask’s ${range.from}, ${range.to}, and ```${range.index}

variables .. The index starts from 0.




#### **`workflow1.dig`**
```rb

+repeat:
  for_range>:
    from: 10
    to: 50
    step: 10
  _do:
    echo>: processing from ${range.from} to ${range.to}.

result


$ digdag run workflow1.dig --rerun
2020-07-12 16:47:17 +0900 [INFO](0017@[0:default]+workflow1+repeat): for_range>: {from=10, to=50, step=10}
2020-07-12 16:47:18 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+range-from=10&to=20): echo>: processing from 10 to 20.
processing from 10 to 20.
2020-07-12 16:47:18 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+range-from=20&to=30): echo>: processing from 20 to 30.
processing from 20 to 30.
2020-07-12 16:47:18 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+range-from=30&to=40): echo>: processing from 30 to 40.
processing from 30 to 40.
2020-07-12 16:47:19 +0900 [INFO](0017@[0:default]+workflow1+repeat^sub+range-from=40&to=50): echo>: processing from 40 to 50.
processing from 40 to 50.

Optionsfor_range>: slices: Divide the iteration by the number specified by slices and execute

for_range>:
  from: 0
  to: 10
  slices: 3
  # this repeats tasks for 3 times (size of a slice is computed automatically):
  # * {range.from: 0, range.to: 4, range.index: 0}
  # * {range.from: 4, range.to: 8, range.index: 1}
  # * {range.from: 8, range.to: 10, range.index: 2}
_do:
  echo>: from ${range.from} to ${range.to}

_parallel:BOOLEAN Run in parallel after iterative task

_do:TASKS The task to be performed

if>: Conditional execution

In case of ture, execute sub task of _do If false, execute the sub task of _else_do

workflow1.dig


+run_if_param_is_false:
  if>: ${param}
  _do:
    echo>: ${param} == true
  _else_do:
    echo>: ${param} == false

param_true


$ digdag run workflow1.dig --rerun -p param=true
2020-07-12 17:01:32 +0900 [INFO](0017@[0:default]+workflow1+run_if_param_is_false): if>: true
2020-07-12 17:01:33 +0900 [INFO](0017@[0:default]+workflow1+run_if_param_is_false^sub): echo>: true == true
true == true

param_false


$ digdag run workflow1.dig --rerun -p param=false
2020-07-12 17:01:14 +0900 [INFO](0017@[0:default]+workflow1+run_if_param_is_false): if>: false
2020-07-12 17:01:15 +0900 [INFO](0017@[0:default]+workflow1+run_if_param_is_false^sub): echo>: false == false
false == false

fail>: Makes the workflow failed

Runs if validation fails

+fail_if_too_few:
  if>: ${count <10}
  _do:
    fail>: count is less than 10!

count_11


$ digdag run workflow1.dig --rerun -p count=11
2020-07-12 17:05:52 +0900: Digdag v0.9.41
2020-07-12 17:05:54 +0900 [WARN](main): Reusing the last session time 2020-07-11T15:00:00+00:00.
2020-07-12 17:05:54 +0900 [INFO](main): Using session /Users/akira/Desktop/ruby/sample/workflows/.digdag/status/20200711T150000+0000.
2020-07-12 17:05:54 +0900 [INFO](main): Starting a new session project id=1 workflow name=workflow1 session_time=2020-07-11T15:00:00+00:00
2020-07-12 17:05:55 +0900 [INFO](0017@[0:default]+workflow1+fail_if_too_few): if>: false

count_9


$ digdag run workflow1.dig --rerun -p count=9
2020-07-12 17:05:46 +0900 [INFO](0017@[0:default]+workflow1+fail_if_too_few): if>: true
2020-07-12 17:05:47 +0900 [INFO](0017@[0:default]+workflow1+fail_if_too_few^sub): fail>: count is less than 10!
2020-07-12 17:05:47 +0900 [ERROR](0017@[0:default]+workflow1+fail_if_too_few^sub): Task +workflow1+fail_if_too_few^sub failed.
count is less than 10!
2020-07-12 17:05:47 +0900 [INFO](0017@[0:default]+workflow1^failure-alert): type: notify
error:
  * +workflow1+fail_if_too_few^sub:
    count is less than 10!

echo>: Shows a message

Message output

+say_hello:
  echo>: Hello world!