I sometimes migrated data from DynamoDB to Aurora MySQL, so I will write how to do it. Please note that data migration is not data migration for all tables, as it is only migration of data stored in one table.
I think there are several ways to migrate from DynamoDB to Aurora MySQL, but I wrote the code in disposable (?) Python for a quick and reliable migration. Even though it is disposable, data may be migrated again, so I made it reusable.
First, create a model for DynamoDB and Aurora MySQL You can write concisely and neatly using the standard library dataclass.
As a point, the scan function is designed to execute recursively in consideration of the case where it is not possible to get all at once due to the limit when fetching all records from the DynamoDB table.
A single Scan operation reads up to the maximum number of items set (if using the Limit parameter) or a maximum of 1 MB of data and then apply any filtering to the results using FilterExpression . If LastEvaluatedKey is present in the response, you need to paginate the result set.
python:models/dynamodb.py
from decimal import Decimal
from dataclasses import dataclass, asdict
from typing import Any, ClassVar, Dict, List
import boto3
from src.utils import convert_to_datetime
@dataclass
class BaseDynamoDB:
table_name: ClassVar[str]
hash_key: ClassVar[str]
@classmethod
def get_client(cls) -> Any:
client = boto3.resource("dynamodb")
return client.Table(cls.table_name)
@classmethod
def scan(
cls, *, recursive: bool = False, exclusive_start_key: Dict = {},
) -> List["BaseDynamoDB"]:
"""Get some or all items from DynamoDB Table
Set recursive to True to get all records
Args:
recursive (bool):
The default is False to get some data
Set to True to get all records
exclusive_start_key (Dict):Primary key of the first item to scan
Returns:
List["BaseDynamoDB"]:Table model instance list
"""
client = cls.get_client()
options = {}
if exclusive_start_key:
options.update(exclusive_start_key)
response = client.scan(**options)
items = list(map(lambda item: cls(**item), response["Items"])) # type: ignore
if recursive and "LastEvaluatedKey" in response:
tmp = cls.scan(
recursive=True,
exclusive_start_key=response["LastEvaluatedKey"],
)
items.extend(tmp)
return items
@dataclass
class Qiita(BaseDynamoDB):
"""Fictitious table for Qiita"""
table_name: ClassVar[str] = "qiita"
hash_key: ClassVar[str] = "user_id"
user_id: str
created_at: int
updated_at: int
memo: str = ""
def __post_init__(self) -> None:
for attr in ("updated_at", "created_at"):
v = getattr(self, attr)
if isinstance(v, Decimal):
setattr(self, attr, convert_to_datetime(str(v)))
def to_dict(self) -> Dict[str, Any]:
"""Return the instance as a dictionary
Returns:
Dict[str, Any]
"""
return asdict(self)
models/aurora.py
from datetime import datetime
from dataclasses import asdict, dataclass, field, InitVar
from typing import Any, ClassVar, Dict
@dataclass
class Qiita:
"""Fictitious table for Qiita"""
table_name: ClassVar[str] = "qiita"
primary_key: ClassVar[str] = "user_id"
user_id: str
#DynamoDB date and time management column
created_at: InitVar[datetime]
updated_at: InitVar[datetime]
#Aurora MySQL date and time management column
registration_date: datetime = field(init=False)
update_date: datetime = field(init=False)
memo: str = ""
registration_id: str = "DynamoDB"
update_id: str = "DynamoDB"
def __post_init__(
self, created_at: datetime, updated_at: datetime
) -> None:
self.registration_date = created_at
self.update_date = updated_at
def to_dict(self) -> Dict[str, Any]:
"""Return the instance as a dictionary
Returns:
Dict[str, Any]
"""
result = asdict(self)
result["registration_date"] = self.registration_date
result["update_date"] = self.update_date
return result
connection.py
import os
from contextlib import contextmanager
from typing import Iterator
import pymysql
from pymysql.connections import Connection
from src.utils import get_logger
logger = get_logger(__name__)
AURORA_DB = os.environ["AURORA_DB"]
AURORA_HOST = os.environ["AURORA_HOST"]
AURORA_PORT = int(os.environ["AURORA_PORT"])
AURORA_USER = os.environ["AURORA_USER"]
AURORA_PASSWORD = os.environ["AURORA_PASSWORD"]
@contextmanager
def connect() -> Iterator[Connection]:
"""Establishing a connection with Aurora MySQL
Returns:
Iterator[Connection]
"""
try:
conn = pymysql.connect(
db=AURORA_DB,
host=AURORA_HOST,
port=AURORA_PORT,
user=AURORA_USER,
password=AURORA_PASSWORD,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=120,
)
except Exception as err:
logger.error("Failure to connect to Aurora")
raise err
try:
yield conn
finally:
conn.close()
Below is the main script If you change the model name, you can reuse it as much as you want.
main.py
from src.db.connection import connect
from src.db.sql import INSERT_QIITA
from src.models.dynamodb import Qiita as DynamoDBQiita
from src.models.mysql import Qiita as MySQLQiita
from src.utils import get_logger
logger = get_logger(__name__)
def main():
logger.info("START")
items = DynamoDBQiita.scan(recursive=True)
logger.info(f"{len(items)}Acquired")
params = list(
map(
lambda item: MySQLQiita(**item.to_dict()).to_dict(),
items,
)
)
try:
with connect() as conn:
with conn.cursor() as cursor:
cursor.executemany(INSERT_QIITA, params)
count = cursor.rowcount
conn.commit()
except Exception as err:
logger.error(err)
logger.error("INSERT failure")
else:
logger.info(f"{count}Successful INSERT")
logger.info("END")
if __name__ == "__main__":
main()
IAM
Create an IAM User with a DynamoDB Read Only Role and get an Access Key ID and Secret Access Key. Aurora MySQL authenticates with username and password like normal MySQL, so it is unnecessary
.env
Prepare an .env file to have pipenv read the credential data
.env
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
AURORA_DB=
AURORA_HOST=
AURORA_PORT=
AURORA_USER=
AURORA_PASSWORD=
Makefile
I wrote the Makefile so that I can use the same script for staging and production, and just execute a simple command. When executing a python script with pipenv, I want to read the secret key from the .env file, so I made a symbolic link from .env to the .env. $ (Stage) file for each environment.
Makefile
stage = stg
.PHONY: dymy
dymy:
ln -sf .env.$(stage) .env
pipenv run python -m src.main
As an aside, if you want to execute a python script with pipenv, you can write it in the scripts section of Pipfile, but it is recommended to write a Makefile when you want to do other processing like this time. Well, shell script is fine, but I personally prefer Makefile.
When executing in the production environment, specify prod in the stage argument.
make dymy stage=prod
This completes the data migration from DynamoDB to Aurora MySQL.
If you really want to check if the migration was successful, the number of items stored in the table of the migration source DynamoDB matches the count number obtained by executing SELECT count (*) FROM ~
in the migration destination Aurora MySQL. Please see if
Recommended Posts