This article is the 23rd day article of CyberAgent Developers Advent Calendar 2020.
CDK is a wrapper library for CloudFormation API that allows you to use AWS configuration management with js commands and programming languages such as TypeScript and Python.
Since it can be written in a language that you are accustomed to writing, it is easy for server-side engineers who are not accustomed to configuration management in YAML or JSON, and it is recommended for projects where you want to leave infrastructure management to the server-side engineer.
In addition, since it can be abstracted and standardized, a wide range of expressions are possible.
This time, I would like to write from the rudimentary part used when managing the configuration of Step Functions with such an SDK to the slightly in-depth part.
CDK + Python + StepFunctions
For the basic writing method of CDK, refer to getting-started on github etc. In this article, we will focus on the part that handles Step Functions.
First, the basic writing method is as follows.
from aws_cdk import (
core,
aws_lambda as lambdas,
aws_stepfunctions_tasks as tasks,
aws_stepfunctions as sf
)
class SampleStack(core.Stack):
def __init__(self,
scope: core.Construct,
id: str,
**kwargs) -> None:
super().__init__(scope, id, **kwargs)
hello_lambda: lambdas.Function = #Lambda definition
hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello",
lambda_function=hello_lambda)
world_lambda: lambdas.Function = #Lambda definition
world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] world",
lambda_function=world_lambda)
definition: sf.Chain = hello_task.next(world_task)
sf.StateMachine(self, "hello_workflow", definition=definition)
In the code above
Is being done.

Passing the task to each task is described as follows.
from aws_cdk import (
core,
aws_stepfunctions_tasks as tasks,
aws_stepfunctions as sf
)
class SampleStack(core.Stack):
def __init__(self,
scope: core.Construct,
id: str,
**kwargs) -> None:
super().__init__(scope, id, **kwargs)
hello_lambda: lambdas.Function = #Lambda definition
hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello",
lambda_function=hello_lambda,
result_path="$.helloLambda")
world_lambda: lambdas.Function = #Lambda definition
world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] world",
lambda_function=world_lambda,
payload=sf.TaskInput.from_json_path_at("$.helloLambda.Payload"))
definition: sf.Chain = sf.Chain.start(hello_task).next(world_task)
sf.StateMachine(self, "hello_workflow", definition=definition)
In StepFunction, you can add the execution result to json by describing result_path.
By writing like this, you can pass the execution result of hello_task to world_task.
For more information, see Step Functions Official Documentation (https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/input-output-resultpath.html).
By using add_catch, you can easily notify of errors.
It is recommended to output $$ .Execution.id, $ .Error, $ .Cause, etc. because it is easy to investigate the cause and re-execute.
from aws_cdk import (
core,
aws_lambda as lambdas,
aws_stepfunctions_tasks as tasks,
aws_stepfunctions as sf
)
class SampleStack(core.Stack):
def __init__(self,
scope: core.Construct,
id: str,
**kwargs) -> None:
super().__init__(scope, id, **kwargs)
hello_lambda: lambdas.Function = #Lambda definition
hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello",
lambda_function=hello_lambda,
result_path="$.helloLambda")
world_lambda: lambdas.Function = #Lambda definition
world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] world",
lambda_function=world_lambda,
payload=sf.TaskInput.from_json_path_at("$.helloLambda.Payload"))
notification_error: lambdas.Function = #Definition of lambda
execution_id: str = sf.TaskInput.from_data_at("$$.Execution.Id").value
err: str = sf.TaskInput.from_data_at("$.Error").value
cause: str = sf.TaskInput.from_data_at("$.Cause").value
notification_error_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] notification_error",
lambda_function=notification_error,
payload=sf.TaskInput.from_object({
"execution_id": execution_id,
"error": err,
"cause": cause
}))
job_failed: sf.Fail = sf.Fail(self,
"Job Failed",
cause="Job Failed",
error="Workflow FAILED")
error_handler: sf.Chain = notification_error_task.next(job_failed)
hello_task.add_catch(error_handler, errors=['States.ALL'])
world_task.add_catch(error_handler, errors=['States.ALL'])
definition: sf.Chain = sf.Chain.start(hello_task).next(world_task)
sf.StateMachine(self, "hello_workflow", definition=definition)
There are some points to keep in mind.
--If you do not link sf.Fail with.next ()after the task for error handling, it will be judged as successful on StepFunctions.
--If you write notification_error_task.next (job_failed) in each place, you will get Error: State'[Lambda] notification_error' already has a next state.

You can handle the error with the above method, but you can write it a little better by using sf.Parallel.
from aws_cdk import (
core,
aws_lambda as lambdas,
aws_stepfunctions_tasks as tasks,
aws_stepfunctions as sf
)
class SampleStack(core.Stack):
def __init__(self,
scope: core.Construct,
id: str,
**kwargs) -> None:
super().__init__(scope, id, **kwargs)
hello_lambda: lambdas.Function = #Lambda definition
hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello",
lambda_function=hello_lambda,
result_path="$.helloLambda")
world_lambda: lambdas.Function = #Lambda definition
world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] world",
lambda_function=world_lambda,
payload=sf.TaskInput.from_json_path_at("$.helloLambda.Payload"))
notification_error: lambdas.Function = #Definition of lambda
execution_id: str = sf.TaskInput.from_data_at("$$.Execution.Id").value
err: str = sf.TaskInput.from_data_at("$.Error").value
cause: str = sf.TaskInput.from_data_at("$.Cause").value
notification_error_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] notification_error",
lambda_function=notification_error,
payload=sf.TaskInput.from_object({
"execution_id": execution_id,
"error": err,
"cause": cause
}))
job_failed: sf.Fail = sf.Fail(self,
"Job Failed",
cause="Job Failed",
error="Workflow FAILED")
definition: sf.Chain = sf.Chain.start(hello_task).next(world_task).to_single_state("definition")
definition.add_catch(notification_error_task.next(job_failed))
sf.StateMachine(self, "hello_workflow", definition=definition)
By defining it like this, it will handle if any of the tasks in Parallel fails.

Conditional branching can be implemented by using sf.Choice.
It seems to be difficult to see if I try to write multiple conditional branches, but I feel that I dared to do so in that area.
from aws_cdk import (
core,
aws_lambda as lambdas,
aws_stepfunctions_tasks as tasks,
aws_stepfunctions as sf
)
class SampleStack(core.Stack):
def __init__(self,
scope: core.Construct,
id: str,
**kwargs) -> None:
super().__init__(scope, id, **kwargs)
hello_or_world: lambdas.Function = #Lambda definition
hello_or_world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello or world",
lambda_function=hello_or_world,
result_path="$helloOrWorld")
hello_lambda: lambdas.Function = #Lambda definition
hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello",
lambda_function=hello_lambda)
world_lambda: lambdas.Function = #Lambda definition
world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] world",
lambda_function=world_lambda)
job_failed: sf.Fail = sf.Fail(self,
"Job Failed",
cause="Job Failed",
error="Workflow FAILED")
definition: sf.Chain = sf.Chain.start(hello_or_world_task)\
.next(
sf.Choice(self, "hello or world ?")
.when(sf.Condition.string_equals("$.helloOrWorld.Payload", "hello"), hello_task)
.when(sf.Condition.string_equals("$.helloOrWorld.Payload", "world"), world_task)
.otherwise(job_failed)
)
sf.StateMachine(self, "hello_workflow", definition=definition)

It's very simple, but be aware that if you inadvertently forget to define result_path, all the results calculated in the previous step will be overwritten.
This is the most troublesome thing I touched this time. When you try to execute an EMR task from StepFunctions (EmrAddStep in EMR), pass the arguments as an array.
args=[
"spark-submit",
"--deploy-mode",
"cluster",
"--master",
"yarn",
"--class",
"Main",
"hoge.jar",
"2020/12/23/01",
"--tz",
"utc"
]
For example, if you try to pass the processing result of the previous stage in it,
args: typing.Optional[typing.List[str]]
So I get angry.
Therefore, once you enter an appropriate character string,
args=[
"<$.hoge.Payload>"
],
Later, I made the definition into a string and replaced it with the result formatted by Lambda. (Please tell me if there is a better way.)
prerpare_workflow: sf.Chain = sf.Chain.start(emr_create_cluster).next(emr_add_step)
definition: sf.Parallel = sf.Parallel(self, id="definition")
definition.branch(prerpare_workflow)
definition_json = definition.to_state_json()["Branches"][0]
definition_str = json.dumps(definition_json) \
.replace('"Args": ["<$.hoge.Payload>"]',
'"Args.$": "$.hoge.Payload"', 1)
Recommended Posts