Entre deux comptes AWS (appairage de VPC terminé), traitez les données d'une base de données à l'autre pour créer un lot comme.
En utilisant activement les services AWS et en étudiant, Tout d'abord, placez DynamoDB entre et divisez [Lambda](https://docs.aws.amazon. com / ja_jp / lambda / latest / dg / welcome.html) Il est traité par fonction.
Les informations d'accès à RDS sont stockées à l'aide de Secrets Manager et la série de flux est [Step Functions]. J'ai essayé de le construire avec (https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/welcome.html). Planifiez ceci sur Événements ClooudWatch.
Construisons chacun.
① Lambda [RDS to DynamoDB] L'accès à RDS avec Lambda est un anti-pattern en raison du problème du nombre de pools de connexions [^ 1], Étant donné que ce lot est un déclencheur de planification, le nombre d'exécutions simultanées ne devrait pas être un problème.
cf. Lambda + RDS est un anti-pattern
[^ 1]: Aperçu du proxy RDS annoncé Semble être résolu.
Placez-le dans le VPC pour l'accès RDS. Les stratégies IAM requises sont les suivantes.
AWSLambdaVPCAccessExecutionRole
--SecretsManager autorisation de lecture (DescribeSecret
, GetSecretValue
)DescribeTable
, PutItem
)Le runtime est ** Python 3.8 **.
En plus de boto3
, la bibliothèque utilise pymysql
pour l'accès RDS (MySQL).
(Pip installer localement et télécharger le zip)
Secrets Manager Il stocke des informations secrètes telles que RDS et DocumentDB, et semble gérer automatiquement la rotation des mots de passe en fonction des paramètres. Vous pouvez stocker n'importe quel élément en tant que secret, pas seulement votre identifiant et votre mot de passe. Exemple) Hôte, port, nom de la base de données, etc.
Au fait, lorsque j'essaye de définir avec "RDS" ici, je ne peux sélectionner que des instances RDS sous le compte correspondant. Cette fois, je voulais créer les informations secrètes du RDS de la destination Peering, j'ai donc décidé de les créer comme un secret arbitraire dans "Autre". (Pas de rotation)
Après la création, un exemple de code pour chaque langue sera généré. Vous pouvez simplement le copier et le coller. (Peut être confirmé plus tard)
get_secret_value_response['SecretString']Ou base64.b64decode(get_secret_value_response['SecretBinary'])On dit qu'il sera utilisé, donc je vais le retourner.
L'appelant ressemble à ceci.
#### **`lambda.py`**
```python
secrets = json.loads(get_secret())
conn = pymysql.connect(
secrets['host'],
user=secrets['username'],
passwd=secrets['password'],
db=secrets['dbname'],
cursorclass=pymysql.cursors.DictCursor
)
DynamoDB Créez la table requise à l'avance. La clé primaire se compose d'une ** clé de partition ** (obligatoire) et d'une ** clé de tri ** (facultatif).
cf. J'ai résumé l'index clé de DynamoDB
Les enregistrements dans Relational DB semblent être appelés ** Item ** dans DynamoDB.
Entrez les données avec PutItem
.
Les types pris en charge sont différents des types MySQL, donc si vous essayez de saisir le type de date tel quel, une erreur se produira.
cf. Règles de dénomination et types de données # Types de données
De plus, comme les caractères vides ne peuvent pas être saisis, il est nécessaire de spécifier explicitement le type Null.
lambda.py
import boto3
...
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('table-name')
for row in data:
for k,v in row.items():
if isinstance(v, str) and v == '':
#Caractères vides dans DynamoDB''Spécifiez explicitement None car une erreur se produira si vous essayez d'entrer
row[k] = None
elif isinstance(v, (datetime.date, datetime.datetime, datetime.time)):
#DynamoDB ne prend pas en charge le type date / heure, alors convertissez-le en chaîne de caractères
row[k] = v.isoformat() #Bien sûr, strftime est bien
#Écrase si la clé existe
res = table.put_item(
Item=row
)
Il existe également un ʻUpdateItem` qui spécifie la colonne à mettre à jour, similaire à UPDATE dans RDB.
② Lambda [DynamoDB to RDS] Il est également placé dans le VPC et la stratégie IAM est la suivante.
AWSLambdaVPCAccessExecutionRole
--SecretsManager autorisation de lecture (DescribeSecret
, GetSecretValue
)DescribeTable
, GetItem
, Scan
, Query
)DynamoDB
Obtenez 1 élément avec GetItem
de la spécification de clé primaire.
lambda.py
import boto3
...
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('table-name')
#GetItem (avec clé de tri)
res = table.get_item(
Key={
'partition-key-name': VALUE1,
'sort-key-name': VALUE2
}
)
res['Item'] #L'objet acquis. Si tu ne peux pas l'obtenir'Item'N'existe pas.
Il y a aussi «Requête» et «Scan». Pour comprendre comment l'utiliser correctement ...
Fondamentalement, il semble bon d'utiliser «Query».
cf. J'ai essayé DynamoDB en utilisant Python (boto3)
lambda.py
import boto3
from boto3.dynamodb.conditions import Key #j'ai besoin de ça
...
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('table-name')
#Query:Spécification de clé (& avec clé de tri)
res = table.query(
KeyConditionExpression=Key('partition-key-name').eq(VALUE1) & Key('sort-key-name').eq(VALUE2)
)
#Query:Spécification d'index (& clé de partition uniquement)
res = table.query(
IndexName='index-name',
KeyConditionExpression=Key('partition-key-name').eq(VALUE3)
)
res['Count'] #Nombre d'acquisitions
res['Items'] #Liste des objets acquis
Lors de la création d'un index avec DynamoDB, il s'appelle ** projection ** et ressemble à une image dans laquelle une table de copie est créée. Selon la taille des données de la table, seule la clé doit être reflétée plutôt que toutes les colonnes. Obtenir la clé avec l'index → Récupérer à nouveau l'élément avec la clé Il vaut peut-être mieux faire le traitement.
③ Step Functions Un flux de processus peut être construit à l'aide de la notation JSON. (Il n'est pas limité à Lambda qui peut être combiné) Vous pouvez écrire un branchement conditionnel et un traitement parallèle, ce qui vous permet de créer un flux de manière assez flexible. Pour être honnête, il est difficile de lire et d'écrire s'il ne s'agit que d'une définition, mais c'est utile car cela vérifie la syntaxe et crée automatiquement un diagramme de flux. Il semble que le flux créé s'appelle ** state machine **.
cf. Système de traitement par lots sans serveur créé avec AWS Step Functions
Lors de l'appel de Lambda, «Invoke Function» est requis dans la stratégie IAM.
Les fonctions de ① et ② sont en fait un peu plus subdivisées, nous allons donc créer un flux ici. Le traitement série et le traitement parallèle sont à peu près les suivants.
Traitement série de la fonction Lambda
{
"Comment": "Commentaire Commentaire Commentaire",
"StartAt": "First Process",
"States": {
"First Process": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"Next": "Second Process"
},
"Second Process": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"End": true
}
}
}
Traitement parallèle des fonctions Lambda
{
"Comment": "Commentaire Commentaire Commentaire",
"StartAt": "Main Process",
"States": {
"Main Process": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "Branch Process A",
"States": {
"Branch Process A": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"End": true
}
}
},
{
"StartAt": "Branch Process B",
"States": {
"Branch Process B": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"End": true
}
}
}
]
}
}
}
CloudWatch Events
Les machines d'état peuvent être déclenchées par CloudWatch Events.
Lors de l'exécution d'un planning, il est spécifié par l'expression rate
ou l'expression cron
,
J'étais tellement accro à «cron», donc juste un rappel.
--6 éléments de minute, heure, jour, mois, jour, année
cf. Schedule Expressions for Rules
Tous ces services étaient nouveaux pour moi, mais ils étaient très intéressants. Soyons sans serveur!
Et quand je l'essaye, je me demande quoi faire avec la gestion de code Lambda et l'environnement de développement ...?
Recommended Posts