It is convenient to use Lambda by linking AWS resources, but the event is complicated every time. This time, when picking up DynamoDB Stream in Python, I want to be able to write something other than the main processing concisely.
DynamoDB Streams and AWS Lambda Triggers-Amazon DynamoDB
The content of this article is registered in PyPI as a library.
It will be easier if you use the processing around here.
--I want to register the corresponding function (including lambda) and execute it for each record. --Since the called party can decide the context, there is no need for branching. --Easy to retrieve Item data ――By the way, it comes with the familiar type of Dynamo, so convert it to Python Dict. -(Option) Exception handling
So, I wish I could write the entry point like this.
handler(plans)
def lambda_handler(event, context):
ds = DynamoStreamDispatcher(event)
ds.on_insert.append(lambda rec: print(rec.event_name)) # lambda OK
ds.on_remove.append(after_remove1)
ds.on_remove.append(after_remove2) #Multiple processes OK
ds.on_modify.append(after_modify)
ds.dispatch()
return True
It feels like registering a handler and dispatching.
For the time being, I implemented the content I wanted to write in the lambda_handler that I decided at the beginning.
lambda_function.py
from __future__ import print_function
import json
from boto3.dynamodb.types import TypeDeserializer
deser = TypeDeserializer()
print('Loading function')
class DeRecord:
##Deserialized Item
def __init__(self, rec):
self.event_name = rec['eventName']
self.old = self._desi(rec['dynamodb'].get('OldImage'))
self.new = self._desi(rec['dynamodb'].get('NewImage'))
def _desi(self, image):
d = {}
if image:
for key in image:
d[key] = deser.deserialize(image[key])
return d
class DynamoStreamDispatcher:
def __init__(self, event):
self.on_insert = []
self.on_remove = []
self.on_modify = []
self.records = []
for r in event['Records']:
#The record is processed into a dict.
self.records.append(DeRecord(r))
self.raw = event
def dispatch(self):
"""
synced dispatcher
"""
results = []
for r in self.records:
try:
for runner in getattr(self, 'on_' + r.event_name.lower()):
results.append(runner(r))
except AttributeError:
print("Unknown event " + r.event_name)
continue
return results
##From here, a function sample of processing for individual Lambda. The argument is a processed record
def after_remove1(rec):
print("deleted")
return None
def after_remove2(rec):
print(rec.old)
return None
def after_modify(rec):
print("key updated...")
print(rec.old['Message'])
print(rec.new['Message'])
return None
def lambda_handler(event, context):
ds = DynamoStreamDispatcher(event)
ds.on_insert.append(lambda rec: print(rec.event_name))
ds.on_remove.append(after_remove1)
ds.on_remove.append(after_remove2)
ds.on_modify.append(after_modify)
ds.dispatch()
return True
Try running DynamoDB Update (* attached at the end) from the Sample event template.
START RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3 Version: $LATEST
INSERT
key updated...
New item!
This item has changed
deleted
{u'Message': u'This item has changed', u'Id': Decimal('101')}
END RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3
Each registered function is executed and OK.
It's almost my own library, but if there is nothing similar to PyPI, I think I can register and use it. After that, I want to store the differences and so on.
Appendix: DynamoDB Update Sample Events
python
{
"Records": [
{
"eventID": "1",
"eventVersion": "1.0",
"dynamodb": {
"Keys": {
"Id": {
"N": "101"
}
},
"NewImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES",
"SequenceNumber": "111",
"SizeBytes": 26
},
"awsRegion": "us-west-2",
"eventName": "INSERT",
"eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
"eventSource": "aws:dynamodb"
},
{
"eventID": "2",
"eventVersion": "1.0",
"dynamodb": {
"OldImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"SequenceNumber": "222",
"Keys": {
"Id": {
"N": "101"
}
},
"SizeBytes": 59,
"NewImage": {
"Message": {
"S": "This item has changed"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"awsRegion": "us-west-2",
"eventName": "MODIFY",
"eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
"eventSource": "aws:dynamodb"
},
{
"eventID": "3",
"eventVersion": "1.0",
"dynamodb": {
"Keys": {
"Id": {
"N": "101"
}
},
"SizeBytes": 38,
"SequenceNumber": "333",
"OldImage": {
"Message": {
"S": "This item has changed"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"awsRegion": "us-west-2",
"eventName": "REMOVE",
"eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
"eventSource": "aws:dynamodb"
}
]
}
reference:
Recommended Posts