This time, we will implement the AWS Batch environment using CDK. There are many implementation examples in TypeScript, but there weren't many in Python, so I wrote an article.
The execution environment is as follows. In particular, I will not touch on the installation or the initial settings of aws-cli and aws-cdk. However, as a caveat, aws-cdk has a very high version update frequency, and even the contents currently written may not work.
What is worrisome is the price.
When I operated under the following conditions, I was charged only the EC2 fee, which was about 0.01 [$ / day]
.
(In Batch, the instance is created after the queue is added to the job every time, and it is deleted when the job is completed.)
Follow the steps below to prepare the Batch execution environment.
The folder structure is as follows. The number on the right side of the file name corresponds to the number in the above procedure.
batch_example
└── src
├── docker
│ ├── __init__.py (1)
│ ├── Dockerfile (2)
│ ├── requirements.txt (2)
│ └── Makefile (3)
└── batch_environment
├── app.py (4)
├── cdk.json
└── README.md
Now, proceed with the implementation according to the above procedure.
An example of script executed in Docker is shown below.
click
is used to pass command line arguments from CMD
watchtower
is used to write logs to CloudWatch Logs.
__init__.py
#For time parse
from datetime import datetime
from logging import getLogger, INFO
#Installation library
from boto3.session import Session
import click
import watchtower
#Get the value from the environment variable when specified by envvar
@click.command()
@click.option("--time")
@click.option("--s3_bucket", envvar='S3_BUCKET')
def main(time: str, s3_bucket: str):
if time:
#Parsing the time assuming that it will be executed from CloudWatch Event
d = datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ")
#Get execution date
execute_date = d.strftime("%Y-%m-%d")
#logger settings
#The name of the logger becomes the name of the log stream
logger_name = f"{datetime.now().strftime('%Y/%m/%d')}"
logger = getLogger(logger_name)
logger.setLevel(INFO)
#Specify the name of the CloudWatch Logs log group here
#Send logs via IAM Role, passing Session
handler = watchtower.CloudWatchLogHandler(log_group="/aws/some_project", boto3_session=Session())
logger.addHandler(handler)
#Scheduled processing
#Here, only write the execution date and time to CloudWatch Logs
logger.info(f"{execute_date=}")
if __name__ == "__main__":
"""
python __init__.py
--time 2020-09-11T12:30:00Z
--s3_bucket your-bucket-here
"""
main()
Next, create a Dockerfile that executes the above Python script. I built it in multiple stages by referring to here.
Dockerfile
#This is a build container
FROM python:3.8-buster as builder
WORKDIR /opt/app
COPY requirements.txt /opt/app
RUN pip3 install -r requirements.txt
#From here, prepare the container for execution
FROM python:3.8-slim-buster as runner
COPY --from=builder /usr/local/lib/python3.8/site-packages /usr/local/lib/python3.8/site-packages
COPY src /opt/app/src
WORKDIR /opt/app/src
CMD ["python3", "__init__.py"]
At the same time, put the library to be used in requirements.txt.
requirements.txt
click
watchtower
After creating the Dockerfile, register it in ECR. First, create a repository by pressing the "Create Repository" button on the ECR from the console.
Set the name of the repository appropriately.
Select the repository you created and press the "Show Push Command" button.
Then, the commands required for pushing will be displayed, so ** copy and execute without thinking. ** ** If you fail here, I think that the AWS CLI settings are not working properly, so please review the AWS CLI settings.
Since it is difficult to type the command every time, create a Makefile that copies the above command.
(The command --username AWS
in 1 seems to be a constant.)
Makefile
.PHONY: help
help:
@echo " == push docker image to ECR == "
@echo "type 'make build tag push' to push docker image to ECR"
@echo ""
.PHONY: login
login:
(1 command)aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin {ACCOUNT_NUMBER}.dkr.ecr.ap-northeast-1.amazonaws.com
.PHONY: build
build:
(2 commands)docker build -t {REPOSITORY_NAME} .
.PHONY: tag
tag:
(3 commands)docker tag {REPOSITORY_NAME}:latest {ACCOUNT_NUMBER}.dkr.ecr.ap-northeast-1.amazonaws.com/{REPOSITORY_NAME}:latest
.PHONY: push
push:
(4 commands)docker push {ACCOUNT_NUMBER}.dkr.ecr.ap-northeast-1.amazonaws.com/{REPOSITORY_NAME}:latest
By using this Makefile, you can easily shorten the command as follows. In addition, I don't think there is any dangerous information in the above Makefile even if it is leaked to the outside, so I can share the source code.
#Log in to ECR
$ make login
#Push the latest image to ECR
$ make build tag push
The implementation contents of the CDK are based on this article written in TypeScript.
In addition, it is better to execute $ cdk init
in the directory where app.py is implemented in advance.
Each package name is long ... In addition, the installation time is quite long.
$ pip install aws-cdk-core aws-cdk-aws-stepfunctions aws-cdk-aws-stepfunctions-tasks aws-cdk-aws-events-targets aws-cdk.aws-ec2 aws-cdk.aws-batch aws-cdk.aws-ecr
First, create a class for the environment to be built this time.
Stack_name
and stack_env
are set as arguments of the BatchEnvironment class.
This corresponds to the name of this environment and the execution environment (verification / development / production).
(If you really want to separate the execution environment, I think you need to change the ECR repository as well.)
app.py
from aws_cdk import (
core,
aws_ec2,
aws_batch,
aws_ecr,
aws_ecs,
aws_iam,
aws_stepfunctions as aws_sfn,
aws_stepfunctions_tasks as aws_sfn_tasks,
aws_events,
aws_events_targets,
)
class BatchEnvironment(core.Stack):
"""
Batch environment and Step Functions to execute it+Create a CloudWatch Event environment
"""
#ECR repository name created above
#Pull images from this repository when running in Batch
ECR_REPOSITORY_ARN = "arn:aws:ecr:ap-northeast-1:{ACCOUNT_NUMBER}:repository/{YOUR_REPOSITORY_NAME}"
def __init__(self, app: core.App, stack_name: str, stack_env: str):
super().__init__(scope=app, id=f"{stack_name}-{stack_env}")
#The following implementation is the image below here.
app.py
# def __init__(...):in
#CIDR has your favorite range
cidr = "192.168.0.0/24"
# === #
# vpc #
# === #
#VPCs are (should) be available for free if you only use public subnets
vpc = aws_ec2.Vpc(
self,
id=f"{stack_name}-{stack_env}-vpc",
cidr=cidr,
subnet_configuration=[
#Define netmask for Public Subnet
aws_ec2.SubnetConfiguration(
cidr_mask=28,
name=f"{stack_name}-{stack_env}-public",
subnet_type=aws_ec2.SubnetType.PUBLIC,
)
],
)
security_group = aws_ec2.SecurityGroup(
self,
id=f'security-group-for-{stack_name}-{stack_env}',
vpc=vpc,
security_group_name=f'security-group-for-{stack_name}-{stack_env}',
allow_all_outbound=True
)
batch_role = aws_iam.Role(
scope=self,
id=f"batch_role_for_{stack_name}-{stack_env}",
role_name=f"batch_role_for_{stack_name}-{stack_env}",
assumed_by=aws_iam.ServicePrincipal("batch.amazonaws.com")
)
batch_role.add_managed_policy(
aws_iam.ManagedPolicy.from_managed_policy_arn(
scope=self,
id=f"AWSBatchServiceRole-{stack_env}",
managed_policy_arn="arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole"
)
)
batch_role.add_to_policy(
aws_iam.PolicyStatement(
effect=aws_iam.Effect.ALLOW,
resources=[
"arn:aws:logs:*:*:*"
],
actions=[
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogStreams"
]
)
)
#Role given to EC2
instance_role = aws_iam.Role(
scope=self,
id=f"instance_role_for_{stack_name}-{stack_env}",
role_name=f"instance_role_for_{stack_name}-{stack_env}",
assumed_by=aws_iam.ServicePrincipal("ec2.amazonaws.com")
)
instance_role.add_managed_policy(
aws_iam.ManagedPolicy.from_managed_policy_arn(
scope=self,
id=f"AmazonEC2ContainerServiceforEC2Role-{stack_env}",
managed_policy_arn="arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role"
)
)
#Add policy to access S3
instance_role.add_to_policy(
aws_iam.PolicyStatement(
effect=aws_iam.Effect.ALLOW,
resources=["*"],
actions=["s3:*"]
)
)
#Add policy to access CloudWatch Logs
instance_role.add_to_policy(
aws_iam.PolicyStatement(
effect=aws_iam.Effect.ALLOW,
resources=[
"arn:aws:logs:*:*:*"
],
actions=[
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogStreams"
]
)
)
#Grant EC2 a role
instance_profile = aws_iam.CfnInstanceProfile(
scope=self,
id=f"instance_profile_for_{stack_name}-{stack_env}",
instance_profile_name=f"instance_profile_for_{stack_name}-{stack_env}",
roles=[instance_role.role_name]
)
app.py
#Continuation of VPC...
# ===== #
# batch #
# ===== #
batch_compute_resources = aws_batch.ComputeResources(
vpc=vpc,
maxv_cpus=4,
minv_cpus=0,
security_groups=[security_group],
instance_role=instance_profile.attr_arn,
type=aws_batch.ComputeResourceType.SPOT
)
batch_compute_environment = aws_batch.ComputeEnvironment(
scope=self,
id=f"ProjectEnvironment-{stack_env}",
compute_environment_name=f"ProjectEnvironmentBatch-{stack_env}",
compute_resources=batch_compute_resources,
service_role=batch_role
)
job_role = aws_iam.Role(
scope=self,
id=f"job_role_{stack_name}-{stack_env}",
role_name=f"job_role_{stack_name}-{stack_env}",
assumed_by=aws_iam.ServicePrincipal("ecs-tasks.amazonaws.com")
)
job_role.add_managed_policy(
aws_iam.ManagedPolicy.from_managed_policy_arn(
scope=self,
id=f"AmazonECSTaskExecutionRolePolicy_{stack_name}-{stack_env}",
managed_policy_arn="arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
)
)
job_role.add_managed_policy(
aws_iam.ManagedPolicy.from_managed_policy_arn(
scope=self,
id=f"AmazonS3FullAccess_{stack_name}-{stack_env}",
managed_policy_arn="arn:aws:iam::aws:policy/AmazonS3FullAccess"
)
)
job_role.add_managed_policy(
aws_iam.ManagedPolicy.from_managed_policy_arn(
scope=self,
id=f"CloudWatchLogsFullAccess_{stack_name}-{stack_env}",
managed_policy_arn="arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
)
)
batch_job_queue = aws_batch.JobQueue(
scope=self,
id=f"job_queue_for_{stack_name}-{stack_env}",
job_queue_name=f"job_queue_for_{stack_name}-{stack_env}",
compute_environments=[
aws_batch.JobQueueComputeEnvironment(
compute_environment=batch_compute_environment,
order=1
)
],
priority=1
)
#Get ECR repository
ecr_repository = aws_ecr.Repository.from_repository_arn(
scope=self,
id=f"image_for_{stack_name}-{stack_env}",
repository_arn=self.ECR_REPOSITORY_ARN
)
#Obtaining an image from ECR
container_image = aws_ecs.ContainerImage.from_ecr_repository(
repository=ecr_repository
)
#Job definition
#Here, use it in Python script`S3_BUCKET`As an environment variable
batch_job_definition = aws_batch.JobDefinition(
scope=self,
id=f"job_definition_for_{stack_env}",
job_definition_name=f"job_definition_for_{stack_env}",
container=aws_batch.JobDefinitionContainer(
image=container_image,
environment={
"S3_BUCKET": f"{YOUR_S3_BUCKET}"
},
job_role=job_role,
vcpus=1,
memory_limit_mib=1024
)
)
From here, it is not always necessary to build the Batch environment, It is done using Step Functions and CloudWatch Event for periodic execution.
You can also call Batch directly from CloudWatch Event, Step Functions are inserted in between, considering the ease of cooperation with other services and the passing of parameters.
When registering as a Step Functions step
Overwrite the Docker CMD command (= set in Batch job definition) and
It takes the argument time
from CloudWatch Event and passes it to the Python script.
app.py
#Continued from Batch...
# ============= #
# StepFunctions #
# ============= #
command_overrides = [
"python", "__init__.py",
"--time", "Ref::time"
]
batch_task = aws_sfn_tasks.BatchSubmitJob(
scope=self,
id=f"batch_job_{stack_env}",
job_definition=batch_job_definition,
job_name=f"batch_job_{stack_env}_today",
job_queue=batch_job_queue,
container_overrides=aws_sfn_tasks.BatchContainerOverrides(
command=command_overrides
),
payload=aws_sfn.TaskInput.from_object(
{
"time.$": "$.time"
}
)
)
#This time there is only one step, so it's simple, but if you want to connect multiple steps
# batch_task.next(aws_sfn_tasks.JOB).next(aws_sfn_tasks.JOB)
#You can pass it with a chain method like this.
definition = batch_task
sfn_daily_process = aws_sfn.StateMachine(
scope=self,
id=f"YourProjectSFn-{stack_env}",
definition=definition
)
# ================ #
# CloudWatch Event #
# ================ #
# Run every day at 21:30 JST
# See https://docs.aws.amazon.com/lambda/latest/dg/tutorial-scheduled-events-schedule-expressions.html
events_daily_process = aws_events.Rule(
scope=self,
id=f"DailySFnProcess-{stack_env}",
schedule=aws_events.Schedule.cron(
minute=31,
hour=12,
month='*',
day="*",
year='*'),
)
events_daily_process.add_target(aws_events_targets.SfnStateMachine(sfn_daily_process))
#Up to here def__init__(...):
Finally, write the process to execute the CDK and you're done.
app.py
#Here def__init__(...):
def main():
app = core.App()
BatchEnvironment(app, "your-project", "feature")
BatchEnvironment(app, "your-project", "dev")
BatchEnvironment(app, "your-project", "prod")
app.synth()
if __name__ == "__main__":
main()
After the above script is completed, check if the CDK is set correctly with the following command, and then deploy it. Even if you create a Batch environment from scratch, it will be completed in about 10 minutes.
#Confirmation of definition
$ cdk synth
Successfully synthesized to {path_your_project}/cdk.out
Supply a stack id (your-project-dev, your-project-feature, your-project-prod) to display its template.
#Confirmation of deployable environment
$ cdk ls
your-project-dev
your-project-feature
your-project-prod
$ cdk deploy your-project-feature
...deploying...
When the deployment is complete, select the Step Functions you created from the console and press the "Start Execution" button.
Put only the argument of time
,
{
"time": "2020-09-27T12:31:00Z"
}
If it works correctly, you're done. Also, check CloudWatch Logs to see if it works as expected.
I really like CDK because you can quickly build and delete the environment with commands!
Also, rather than creating from the console, you can see what is required by the program parameters, so Even if you don't know the service, I thought it would be nice to understand what parameters are required!
(Someday, I will expand the above source in the GitHub repository ...!)
Recommended Posts