This article is the 23rd day article of CyberAgent Developers Advent Calendar 2020.
CDK is a wrapper library for CloudFormation API that allows you to use AWS configuration management with js commands and programming languages such as TypeScript and Python.
Since it can be written in a language that you are accustomed to writing, it is easy for server-side engineers who are not accustomed to configuration management in YAML or JSON, and it is recommended for projects where you want to leave infrastructure management to the server-side engineer.
In addition, since it can be abstracted and standardized, a wide range of expressions are possible.
This time, I would like to write from the rudimentary part used when managing the configuration of Step Functions with such an SDK to the slightly in-depth part.
CDK + Python + StepFunctions
For the basic writing method of CDK, refer to getting-started on github etc. In this article, we will focus on the part that handles Step Functions.
First, the basic writing method is as follows.
from aws_cdk import (
core,
aws_lambda as lambdas,
aws_stepfunctions_tasks as tasks,
aws_stepfunctions as sf
)
class SampleStack(core.Stack):
def __init__(self,
scope: core.Construct,
id: str,
**kwargs) -> None:
super().__init__(scope, id, **kwargs)
hello_lambda: lambdas.Function = #Lambda definition
hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello",
lambda_function=hello_lambda)
world_lambda: lambdas.Function = #Lambda definition
world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] world",
lambda_function=world_lambda)
definition: sf.Chain = hello_task.next(world_task)
sf.StateMachine(self, "hello_workflow", definition=definition)
In the code above
Is being done.
Passing the task to each task is described as follows.
from aws_cdk import (
core,
aws_stepfunctions_tasks as tasks,
aws_stepfunctions as sf
)
class SampleStack(core.Stack):
def __init__(self,
scope: core.Construct,
id: str,
**kwargs) -> None:
super().__init__(scope, id, **kwargs)
hello_lambda: lambdas.Function = #Lambda definition
hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello",
lambda_function=hello_lambda,
result_path="$.helloLambda")
world_lambda: lambdas.Function = #Lambda definition
world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] world",
lambda_function=world_lambda,
payload=sf.TaskInput.from_json_path_at("$.helloLambda.Payload"))
definition: sf.Chain = sf.Chain.start(hello_task).next(world_task)
sf.StateMachine(self, "hello_workflow", definition=definition)
In StepFunction, you can add the execution result to json by describing result_path.
By writing like this, you can pass the execution result of hello_task
to world_task
.
For more information, see Step Functions Official Documentation (https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/input-output-resultpath.html).
By using add_catch
, you can easily notify of errors.
It is recommended to output $$ .Execution.id
, $ .Error
, $ .Cause
, etc. because it is easy to investigate the cause and re-execute.
from aws_cdk import (
core,
aws_lambda as lambdas,
aws_stepfunctions_tasks as tasks,
aws_stepfunctions as sf
)
class SampleStack(core.Stack):
def __init__(self,
scope: core.Construct,
id: str,
**kwargs) -> None:
super().__init__(scope, id, **kwargs)
hello_lambda: lambdas.Function = #Lambda definition
hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello",
lambda_function=hello_lambda,
result_path="$.helloLambda")
world_lambda: lambdas.Function = #Lambda definition
world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] world",
lambda_function=world_lambda,
payload=sf.TaskInput.from_json_path_at("$.helloLambda.Payload"))
notification_error: lambdas.Function = #Definition of lambda
execution_id: str = sf.TaskInput.from_data_at("$$.Execution.Id").value
err: str = sf.TaskInput.from_data_at("$.Error").value
cause: str = sf.TaskInput.from_data_at("$.Cause").value
notification_error_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] notification_error",
lambda_function=notification_error,
payload=sf.TaskInput.from_object({
"execution_id": execution_id,
"error": err,
"cause": cause
}))
job_failed: sf.Fail = sf.Fail(self,
"Job Failed",
cause="Job Failed",
error="Workflow FAILED")
error_handler: sf.Chain = notification_error_task.next(job_failed)
hello_task.add_catch(error_handler, errors=['States.ALL'])
world_task.add_catch(error_handler, errors=['States.ALL'])
definition: sf.Chain = sf.Chain.start(hello_task).next(world_task)
sf.StateMachine(self, "hello_workflow", definition=definition)
There are some points to keep in mind.
--If you do not link sf.Fail
with.next ()
after the task for error handling, it will be judged as successful on StepFunctions.
--If you write notification_error_task.next (job_failed)
in each place, you will get Error: State'[Lambda] notification_error' already has a next state
.
You can handle the error with the above method, but you can write it a little better by using sf.Parallel
.
from aws_cdk import (
core,
aws_lambda as lambdas,
aws_stepfunctions_tasks as tasks,
aws_stepfunctions as sf
)
class SampleStack(core.Stack):
def __init__(self,
scope: core.Construct,
id: str,
**kwargs) -> None:
super().__init__(scope, id, **kwargs)
hello_lambda: lambdas.Function = #Lambda definition
hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello",
lambda_function=hello_lambda,
result_path="$.helloLambda")
world_lambda: lambdas.Function = #Lambda definition
world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] world",
lambda_function=world_lambda,
payload=sf.TaskInput.from_json_path_at("$.helloLambda.Payload"))
notification_error: lambdas.Function = #Definition of lambda
execution_id: str = sf.TaskInput.from_data_at("$$.Execution.Id").value
err: str = sf.TaskInput.from_data_at("$.Error").value
cause: str = sf.TaskInput.from_data_at("$.Cause").value
notification_error_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] notification_error",
lambda_function=notification_error,
payload=sf.TaskInput.from_object({
"execution_id": execution_id,
"error": err,
"cause": cause
}))
job_failed: sf.Fail = sf.Fail(self,
"Job Failed",
cause="Job Failed",
error="Workflow FAILED")
definition: sf.Chain = sf.Chain.start(hello_task).next(world_task).to_single_state("definition")
definition.add_catch(notification_error_task.next(job_failed))
sf.StateMachine(self, "hello_workflow", definition=definition)
By defining it like this, it will handle if any of the tasks in Parallel
fails.
Conditional branching can be implemented by using sf.Choice
.
It seems to be difficult to see if I try to write multiple conditional branches, but I feel that I dared to do so in that area.
from aws_cdk import (
core,
aws_lambda as lambdas,
aws_stepfunctions_tasks as tasks,
aws_stepfunctions as sf
)
class SampleStack(core.Stack):
def __init__(self,
scope: core.Construct,
id: str,
**kwargs) -> None:
super().__init__(scope, id, **kwargs)
hello_or_world: lambdas.Function = #Lambda definition
hello_or_world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello or world",
lambda_function=hello_or_world,
result_path="$helloOrWorld")
hello_lambda: lambdas.Function = #Lambda definition
hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] hello",
lambda_function=hello_lambda)
world_lambda: lambdas.Function = #Lambda definition
world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
"[Lambda] world",
lambda_function=world_lambda)
job_failed: sf.Fail = sf.Fail(self,
"Job Failed",
cause="Job Failed",
error="Workflow FAILED")
definition: sf.Chain = sf.Chain.start(hello_or_world_task)\
.next(
sf.Choice(self, "hello or world ?")
.when(sf.Condition.string_equals("$.helloOrWorld.Payload", "hello"), hello_task)
.when(sf.Condition.string_equals("$.helloOrWorld.Payload", "world"), world_task)
.otherwise(job_failed)
)
sf.StateMachine(self, "hello_workflow", definition=definition)
It's very simple, but be aware that if you inadvertently forget to define result_path
, all the results calculated in the previous step will be overwritten.
This is the most troublesome thing I touched this time. When you try to execute an EMR task from StepFunctions (EmrAddStep in EMR), pass the arguments as an array.
args=[
"spark-submit",
"--deploy-mode",
"cluster",
"--master",
"yarn",
"--class",
"Main",
"hoge.jar",
"2020/12/23/01",
"--tz",
"utc"
]
For example, if you try to pass the processing result of the previous stage in it,
args: typing.Optional[typing.List[str]]
So I get angry.
Therefore, once you enter an appropriate character string,
args=[
"<$.hoge.Payload>"
],
Later, I made the definition into a string and replaced it with the result formatted by Lambda. (Please tell me if there is a better way.)
prerpare_workflow: sf.Chain = sf.Chain.start(emr_create_cluster).next(emr_add_step)
definition: sf.Parallel = sf.Parallel(self, id="definition")
definition.branch(prerpare_workflow)
definition_json = definition.to_state_json()["Branches"][0]
definition_str = json.dumps(definition_json) \
.replace('"Args": ["<$.hoge.Payload>"]',
'"Args.$": "$.hoge.Payload"', 1)
Recommended Posts