Learn Digdag from Digdag official documentation-Operators ① Workflow control operators

Target

Translation of Operators of the document of Digdag's official website + α The final goal is to create a batch in Rails using Digdag's Ruby http://docs.digdag.io/operators/workflow_control.html

table of contents

Getting started Architecture Concepts Workflow definition Scheduling workflow Operators Command reference Language API -Ruby Change the setting value for each environment with Digdag (RubyOnRails) Batch implementation in RubyOnRails environment using Digdag

Operators

Workflow control operators call>: Call another workflow

workfolw1.dig


timezone: Asia/Tokyo

+step1:
  call>: another_workflow.dig
+step2:
  call>: common/shared_workflow.dig

another_workflow.dig


+step1:
    sh>: echo hi! another_workflow.dig

/common/shared_workflow.dig


+step1:
    sh>: echo hi! ./common/shared_workflow.dig

result


$ digdag run workflow1.dig --rerun
2020-07-12 13:38:54 +0900 [INFO]([email protected][0:default]+workflow1+step1): call>: another_workflow.dig
2020-07-12 13:38:54 +0900 [INFO]([email protected][0:default]+workflow1+step1^sub+step1): sh>: echo hi! another_workflow.dig
hi! another_workflow.dig
2020-07-12 13:38:54 +0900 [INFO]([email protected][0:default]+workflow1+step2): call>: common/shared_workflow.dig
2020-07-12 13:38:55 +0900 [INFO]([email protected][0:default]+workflow1+step2^sub+step1): sh>: echo hi! ./common/shared_workflow.dig
hi! ./common/shared_workflow.dig

call>: FILE FILE contains the path to the workflow definition file. The file name must end with .dig. If the called workflow is in a subdirectory, the workflow uses the subdirectory as the working directory. Example) `call>: common / called_workflow.dig``` is defined in the task. If you refer to the ``` queries / data.sql``` file in the called workflow, refer to it with `../queries/data.sql```.

call>: another_workfloww.dig

http_call>: Call workflow fetched by HTTP http_call>The operator creates an http request, parses the response body as a workflow and embeds it as a subtask. call>Similar to an operator. The difference is that another workflow is fetched from http.

This operator parses the response body based on the returned Content-Type header. The Content-Type must be set and the following values are supported:

Parse the response as JSON.



#### **`Use the returned body as is.`**
```application/x-yaml

 If the appropriate Content-Type header is not returned, use the content_type_override option.

**Options**
 content_type_override: Overrides the Content-Type response header returned by the server. This option is useful if the server does not return the proper Content-Type, but returns common values such as `` `text / plain``` and ```application / octet-stream```.

http_call>: https://api.example.com/foobar content_type_override: application/x-yaml


 **require>: Depends on another workflow**
```require>```The operator requests the completion of another workflow.
 This operator is similar to the `` `call> ``` operator, but this operator will not start any other workflow if it is already running or running at the same session time for this workflow. ..
 If the workflow is running or newly started, this operator waits for the workflow to complete. In addition, the require operator can start the workflow for another project.


#### **`workflow1.dig`**
```rb

+step1:
  require>: another_workflow

another_workflow.dig


+step2:
  sh>: echo step2

Execution result


$ digdag run workflow1.dig --rerun
2020-07-12 14:55:34 +0900 [INFO]([email protected][0:default]+workflow1+step1): require>: another_workflow
2020-07-12 14:55:34 +0900 [INFO]([email protected][0:default]+workflow1+step1): Starting a new session project id=1 workflow name=another_workflow session_time=2020-07-11T15:00:00+00:00
2020-07-12 14:55:34 +0900 [INFO]([email protected][0:default]+another_workflow+step2): sh>: echo step2
step2

Optionsproject_id: project_idproject_name: project_name project_idOrproject_nameYou can start the workflow of another project by setting. If the project does not exist, the task will fail. project_id and project_If you set both names, the task will fail.

another_project_wf


project_id: 12345
require>: another_project_wf
project_name: another_project

rerun_on: none, failed, all (default: none) If there are dependent workflow attempts, `rerun_on``` will control whether ``` require> is actually started. none: Do not start the workflow if an attempt already exists. failed: If an attempt exists and the result is unsuccessful, start the workflow. all: ``` require> `` Starts the workflow regardless of the result of the attempt.

ignore_failure:BOOLEAN This operator will fail if the dependent workflow ends with an error by default. However, if ignore_failure: true is set, this operator will succeed even if the workflow ends with an error.

require>: another_workflow
ignore_failure: true

params:MAP This operator passes the parameters to the workflow set to require. It will not be passed to another workflow.

workflow1.dig


+step1:
  require>: another_workflow
  params:
    param_name1: hello

another_workflow


+step2:
  sh>: echo step2:${param_name1}

Execution result


$ digdag run workflow1.dig --rerun
2020-07-12 15:19:34 +0900 [INFO]([email protected][0:default]+workflow1+step1): require>: another_workflow
2020-07-12 15:19:34 +0900 [INFO]([email protected][0:default]+workflow1+step1): Starting a new session project id=1 workflow name=another_workflow session_time=2020-07-11T15:00:00+00:00
2020-07-12 15:19:34 +0900 [INFO]([email protected][0:default]+another_workflow+step2): sh>: echo step2:hello
step2:hello

loop>: Repeat tasks The loop> operator runs the subtask multiple times.

This operator exports the $ {i} variable for the subtask. Its value starts at 0. For example, if count is 3, the task runs at i = 0, i = 1, and i = 2.

workflow1.dig


+repeat:
  loop>: 7
  _do:
    +step1:
      echo>: ${moment(session_time).add(i, 'days')} is ${i} days later than ${session_date}
    +step2:
      echo>: ${moment(session_time).add(i, 'hours')} is ${i} hours later than ${session_local_time}.

result


$ digdag run workflow1.dig --rerun

2020-07-12 16:19:04 +0900 [INFO]([email protected][0:default]+workflow1+repeat): loop>: 7
2020-07-12 16:19:05 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-0+step1): echo>: "2020-07-11T15:00:00.000Z" is 0 days later than 2020-07-11
"2020-07-11T15:00:00.000Z" is 0 days later than 2020-07-11
2020-07-12 16:19:06 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-0+step2): echo>: "2020-07-11T15:00:00.000Z" is 0 hours later than 2020-07-11 15:00:00.
"2020-07-11T15:00:00.000Z" is 0 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:06 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-1+step1): echo>: "2020-07-12T15:00:00.000Z" is 1 days later than 2020-07-11
"2020-07-12T15:00:00.000Z" is 1 days later than 2020-07-11
2020-07-12 16:19:07 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-1+step2): echo>: "2020-07-11T16:00:00.000Z" is 1 hours later than 2020-07-11 15:00:00.
"2020-07-11T16:00:00.000Z" is 1 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:07 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-2+step1): echo>: "2020-07-13T15:00:00.000Z" is 2 days later than 2020-07-11
"2020-07-13T15:00:00.000Z" is 2 days later than 2020-07-11
2020-07-12 16:19:07 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-2+step2): echo>: "2020-07-11T17:00:00.000Z" is 2 hours later than 2020-07-11 15:00:00.
"2020-07-11T17:00:00.000Z" is 2 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:08 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-3+step1): echo>: "2020-07-14T15:00:00.000Z" is 3 days later than 2020-07-11
"2020-07-14T15:00:00.000Z" is 3 days later than 2020-07-11
2020-07-12 16:19:08 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-3+step2): echo>: "2020-07-11T18:00:00.000Z" is 3 hours later than 2020-07-11 15:00:00.
"2020-07-11T18:00:00.000Z" is 3 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:08 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-4+step1): echo>: "2020-07-15T15:00:00.000Z" is 4 days later than 2020-07-11
"2020-07-15T15:00:00.000Z" is 4 days later than 2020-07-11
2020-07-12 16:19:09 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-4+step2): echo>: "2020-07-11T19:00:00.000Z" is 4 hours later than 2020-07-11 15:00:00.
"2020-07-11T19:00:00.000Z" is 4 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:09 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-5+step1): echo>: "2020-07-16T15:00:00.000Z" is 5 days later than 2020-07-11
"2020-07-16T15:00:00.000Z" is 5 days later than 2020-07-11
2020-07-12 16:19:09 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-5+step2): echo>: "2020-07-11T20:00:00.000Z" is 5 hours later than 2020-07-11 15:00:00.
"2020-07-11T20:00:00.000Z" is 5 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:10 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-6+step1): echo>: "2020-07-17T15:00:00.000Z" is 6 days later than 2020-07-11
"2020-07-17T15:00:00.000Z" is 6 days later than 2020-07-11
2020-07-12 16:19:10 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+loop-6+step2): echo>: "2020-07-11T21:00:00.000Z" is 6 hours later than 2020-07-11 15:00:00.
"2020-07-11T21:00:00.000Z" is 6 hours later than 2020-07-11 15:00:00.

Options_parallel:BOOLEAN Run tasks in parallel

true


 ■ `` `_do: TASKS```: Tasks performed in a loop

 **for_each>: Repeat tasks for values**
```for_each>```Operators use variable sets to perform multiple subtasks


#### **`workflow1.rb`**
```rb

+repeat:
  for_each>:
    fruit: [apple, orange]
    verb: [eat, throw]
  _do:
    echo>: ${verb} ${fruit}

result


$ digdag run workflow1.dig --rerun
2020-07-12 16:27:00 +0900 [INFO]([email protected][0:default]+workflow1+repeat): for_each>: {fruit=[apple, orange], verb=[eat, throw]}
2020-07-12 16:27:01 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+for-0=fruit=0=apple&1=verb=0=eat): echo>: eat apple
eat apple
2020-07-12 16:27:01 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+for-0=fruit=0=apple&1=verb=1=throw): echo>: throw apple
throw apple
2020-07-12 16:27:01 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+for-0=fruit=1=orange&1=verb=0=eat): echo>: eat orange
eat orange
2020-07-12 16:27:02 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+for-0=fruit=1=orange&1=verb=1=throw): echo>: throw orange
throw orange

Optionsfor_each>: VARIABLES Variables used in key loops: [value, value, ...] syntax. Variables can be objects or JSON strings.

Example 1


for_each>: {i: [1, 2, 3]}

Example 2


for_each>: {i: '[1, 2, 3]'}

_parallel:BOOLEAN Execute in parallel after iterative task

_do:TASKS Task to be performed

for_range>: Repeat tasks for a range for_range>The operator uses a set of variables to perform subtasks multiple times.

This operator exports the subtasks ``` $ {range.from} ``, $ {range.to} , and` `$ {range.index}` variables. .. The index starts at 0.

workflow1.dig


+repeat:
  for_range>:
    from: 10
    to: 50
    step: 10
  _do:
    echo>: processing from ${range.from} to ${range.to}.

result


$ digdag run workflow1.dig --rerun
2020-07-12 16:47:17 +0900 [INFO]([email protected][0:default]+workflow1+repeat): for_range>: {from=10, to=50, step=10}
2020-07-12 16:47:18 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+range-from=10&to=20): echo>: processing from 10 to 20.
processing from 10 to 20.
2020-07-12 16:47:18 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+range-from=20&to=30): echo>: processing from 20 to 30.
processing from 20 to 30.
2020-07-12 16:47:18 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+range-from=30&to=40): echo>: processing from 30 to 40.
processing from 30 to 40.
2020-07-12 16:47:19 +0900 [INFO]([email protected][0:default]+workflow1+repeat^sub+range-from=40&to=50): echo>: processing from 40 to 50.
processing from 40 to 50.

Optionsfor_range>: slices: Execute by dividing the iteration by the number specified by slices

for_range>:
  from: 0
  to: 10
  slices: 3
  # this repeats tasks for 3 times (size of a slice is computed automatically):
  #  * {range.from: 0, range.to: 4, range.index: 0}
  #  * {range.from: 4, range.to: 8, range.index: 1}
  #  * {range.from: 8, range.to: 10, range.index: 2}
_do:
  echo>: from ${range.from} to ${range.to}

_parallel:BOOLEAN Execute in parallel after iterative task

_do:TASKS Task to be performed

if>: Conditional execution For ture, execute the subtask of _do If false, execute the subtask of _else_do

workflow1.dig


+run_if_param_is_false:
  if>: ${param}
  _do:
    echo>: ${param} == true
  _else_do:
    echo>: ${param} == false

param_true


$ digdag run workflow1.dig --rerun -p param=true
2020-07-12 17:01:32 +0900 [INFO]([email protected][0:default]+workflow1+run_if_param_is_false): if>: true
2020-07-12 17:01:33 +0900 [INFO]([email protected][0:default]+workflow1+run_if_param_is_false^sub): echo>: true == true
true == true

param_false


$ digdag run workflow1.dig --rerun -p param=false
2020-07-12 17:01:14 +0900 [INFO]([email protected][0:default]+workflow1+run_if_param_is_false): if>: false
2020-07-12 17:01:15 +0900 [INFO](00[email protected][0:default]+workflow1+run_if_param_is_false^sub): echo>: false == false
false == false

fail>: Makes the workflow failed Executed if verification fails

+fail_if_too_few:
  if>: ${count < 10}
  _do:
    fail>: count is less than 10!

count_11


$ digdag run workflow1.dig --rerun -p count=11
2020-07-12 17:05:52 +0900: Digdag v0.9.41
2020-07-12 17:05:54 +0900 [WARN](main): Reusing the last session time 2020-07-11T15:00:00+00:00.
2020-07-12 17:05:54 +0900 [INFO](main): Using session /Users/akira/Desktop/ruby/sample/workflows/.digdag/status/20200711T150000+0000.
2020-07-12 17:05:54 +0900 [INFO](main): Starting a new session project id=1 workflow name=workflow1 session_time=2020-07-11T15:00:00+00:00
2020-07-12 17:05:55 +0900 [INFO]([email protected][0:default]+workflow1+fail_if_too_few): if>: false

count_9


$ digdag run workflow1.dig --rerun -p count=9
2020-07-12 17:05:46 +0900 [INFO]([email protected][0:default]+workflow1+fail_if_too_few): if>: true
2020-07-12 17:05:47 +0900 [INFO]([email protected][0:default]+workflow1+fail_if_too_few^sub): fail>: count is less than 10!
2020-07-12 17:05:47 +0900 [ERROR]([email protected][0:default]+workflow1+fail_if_too_few^sub): Task +workflow1+fail_if_too_few^sub failed.
count is less than 10!
2020-07-12 17:05:47 +0900 [INFO]([email protected][0:default]+workflow1^failure-alert): type: notify
error: 
  * +workflow1+fail_if_too_few^sub:
    count is less than 10!

echo>: Shows a message Message output

+say_hello:
  echo>: Hello world!

Recommended Posts

Learn Digdag from Digdag official documentation-Operators ① Workflow control operators
Learn Digdag from Digdag Official Documentation-Scheduling workflow
Learn Digdag from Digdag official documentation-Ah Workflow definition
Learn Digdag from Digdag Official Documentation-Architecture
Learn Digdag from Digdag Official Documentation-Getting started
Learn Digdag from Digdag official documentation-Language API-Ruby
Learn Digdag from Digdag Official Documentation-Ah Concepts