Between two AWS accounts (VPC Peering completed), process data from one database to the other database to create a batch like.
By actively using AWS services as well as studying, First, put DynamoDB in between and split [Lambda](https://docs.aws.amazon. com / ja_jp / lambda / latest / dg / welcome.html) I made it into a form to process with function.
And the access information to RDS is stored using Secrets Manager, and the series of flows is [Step Functions]. I tried to build it with (https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/welcome.html). Schedule this at ClooudWatch Events.
Let's build each one.
① Lambda [RDS to DynamoDB] Accessing RDS with Lambda is an anti-pattern due to the problem of the number of connection pools [^ 1], Since this batch is a schedule trigger, the number of concurrent executions should not be an issue.
cf. Lambda + RDS is an anti-pattern
[^ 1]: This issue with RDS Proxy announced preview Seems to be resolved.
Place it in a VPC for RDS access. The required IAM policies are:
AWSLambdaVPCAccessExecutionRole
--SecretsManager read permission (DescribeSecret
, GetSecretValue
)
--DynamoDB write permission (DescribeTable
, PutItem
)The runtime is ** Python 3.8 **.
In addition to boto3
, the library uses pymysql
for RDS (MySQL) access.
(Pip install locally and upload zip)
Secrets Manager It stores secret information such as RDS and DocumentDB, and seems to automatically manage password rotation depending on the settings. You can store any item as a secret, not just your ID and password. Example) Host, port, database name, etc.
By the way, when I try to set with "RDS" here, I can only select RDS instances under the corresponding account. This time I wanted to create the secret information of the RDS of the Peering destination, so I decided to create it as an arbitrary secret in "Other". (No rotation)
Once created, sample code for each language will be generated. You can just copy and paste it. (Can be confirmed later)
get_secret_value_response['SecretString']Or base64.b64decode(get_secret_value_response['SecretBinary'])It is said that it will be used, so I will return it.
The caller looks like this.
#### **`lambda.py`**
```python
secrets = json.loads(get_secret())
conn = pymysql.connect(
secrets['host'],
user=secrets['username'],
passwd=secrets['password'],
db=secrets['dbname'],
cursorclass=pymysql.cursors.DictCursor
)
DynamoDB Create the required table in advance. The primary key consists of a ** partition key ** (required) and a ** sort key ** (optional).
cf. I summarized the key index of DynamoDB
Records in relational DB seem to be called ** Item ** in DynamoDB.
Input data with PutItem
.
The supported types are different from the MySQL types, so if you try to input the date type etc. as it is, an error will occur.
cf. Naming Rules and Data Types #Data Types
Also, since empty strings cannot be entered, it is necessary to explicitly specify the Null type.
cf. The story that data could not be entered in DynamoDB if it was left empty when using Boto3
lambda.py
import boto3
...
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('table-name')
for row in data:
for k,v in row.items():
if isinstance(v, str) and v == '':
#Empty string in DynamoDB''Explicitly specify None as an error will occur if you try to input
row[k] = None
elif isinstance(v, (datetime.date, datetime.datetime, datetime.time)):
#DynamoDB does not support date / time type, so convert it to a string
row[k] = v.isoformat() #Of course strftime is fine
#Overwrites if key exists
res = table.put_item(
Item=row
)
There is also a ʻUpdateItem` that specifies the column to be updated, similar to UPDATE in RDB.
PutItem
.② Lambda [DynamoDB to RDS] This is also placed in the VPC, and the IAM policy is as follows.
AWSLambdaVPCAccessExecutionRole
--SecretsManager read permission (DescribeSecret
, GetSecretValue
)
--DynamoDB read permission (DescribeTable
, GetItem
, Scan
, Query
)DynamoDB
Get one item with GetItem
of the primary key specification.
lambda.py
import boto3
...
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('table-name')
#GetItem (with sort key)
res = table.get_item(
Key={
'partition-key-name': VALUE1,
'sort-key-name': VALUE2
}
)
res['Item'] #The acquired item. If you can't get it'Item'Does not exist.
There are also Query
and Scan
.
As a rough understanding of how to use it properly ...
Basically, it seems good to use Query
.
cf. I tried DynamoDB using Python (boto3)
lambda.py
import boto3
from boto3.dynamodb.conditions import Key #I need this
...
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('table-name')
#Query:Key specification (& with sort key)
res = table.query(
KeyConditionExpression=Key('partition-key-name').eq(VALUE1) & Key('sort-key-name').eq(VALUE2)
)
#Query:Index specification (& partition key only)
res = table.query(
IndexName='index-name',
KeyConditionExpression=Key('partition-key-name').eq(VALUE3)
)
res['Count'] #Number of acquisitions
res['Items'] #List of acquired items
When creating an index with DynamoDB, it is called ** projection **, and it looks like an image where a copy table is created. Depending on the data size of the table, only the key should be reflected rather than all columns. Get key with index → Get item again with key It may be better to do the processing.
③ Step Functions Those that can build a processing flow in JSON notation. (It is not limited to Lambda that can be combined) You can write conditional branching and parallel processing, so you can create a flow quite flexibly. To be honest, it is difficult to read and write if it is just a definition, but it is helpful because it checks the syntax and automatically creates a flow diagram. It seems that the created flow is called ** state machine **.
cf. Serverless batch system created with AWS Step Functions
When calling Lambda, ʻInvoke Function` is required in the IAM policy.
The functions of ① and ② are actually a little more subdivided, so we will create a flow here. The serial processing and parallel processing are roughly as follows.
Serial processing of Lambda functions
{
"Comment": "Comment Comment Comment",
"StartAt": "First Process",
"States": {
"First Process": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"Next": "Second Process"
},
"Second Process": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"End": true
}
}
}
Lambda function concurrency
{
"Comment": "Comment Comment Comment",
"StartAt": "Main Process",
"States": {
"Main Process": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "Branch Process A",
"States": {
"Branch Process A": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"End": true
}
}
},
{
"StartAt": "Branch Process B",
"States": {
"Branch Process B": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"End": true
}
}
}
]
}
}
}
CloudWatch Events
State machines can be triggered by CloudWatch Events.
When executing a schedule, it is specified by the rate
expression or the cron
expression,
I'm addicted to cron
so it's a reminder.
--6 elements of minute, hour, day, month, day, year
--One of the wildcards for day and day of the week must be ?
(Both *
and the combination of *
and value specification are NG)
--Since the time is UTC regardless of the region, specify 9 hours before Japan time.
cf. Schedule Expressions for Rules
All of these services were new to me, but they were very interesting. Let's serverless!
And when I try it, I wonder how to manage Lambda code and development environment ...?
Recommended Posts