(Addition) It was just free, so I will post it on the 9th day of AWS Lambda and Serverless # 2 Advent Calendar 2019.
I suddenly thought that I wanted the following LINE notification of Qiita's likes,
Discover related articles! -> We built a gentle world that will notify you LINE when you like Qiita
The structure is simple (and the article itself) is very helpful.
This time, I wanted to create an event-driven application using AWS services such as Lambda, so I changed the configuration a little and tried to imitate it.
The original article is implemented with the following simple structure.
--Scraping the notification column of Qiita --Difference from the notification column log saved in the DB --Send the difference to IFTTT's webhooks (IFTTT notifies LINE) --Periodical execution of the above series of processes on AWS Lambda
This time, I reconsidered the configuration as follows so that Lambda processing can be completed more concisely and quickly.
--Scraping-> Change to aggregation using Qiita API --Eliminate sleep to reduce processing time --Easy to build environment --All processing with one Lambda-> Divide into two Lambda (divide into data collection function and notification function) --Difference from past logs using Dynamo DB Stream (no need to write script)
Since I am using Lambda and Dynamo DB for the first time this time, I feel strongly that I want to try various functions. So I can't deny the feeling of overdoing it. I think the original article is smarter.
I used the following:
Qiita API v2 The API is available for Qiita (Qiita API v2 documentation). You can easily get various information. However, unfortunately there is no API that can receive notifications, so we will implement notifications by combining the following.
function | end point | advantage | Disadvantage |
---|---|---|---|
Get a list of likes | GET /api/v2/authenticated_user/items | You can get the total number of likes up to 100 articles with one get | Memory is consumed because the response contains the body |
Get the user ID you liked | GET /api/v2/items/:item_id/likes | You can get users who like each article | You need to get as many articles as you need |
Lambda pays only for the time you use it, so reducing processing time is prioritized over strict notifications. I think that it is the most strict to get all the user IDs that you like regularly and take the difference, but since Qiita API v2 can only get the user IDs that you like for each article, API as many as the number of articles You will need to hit. Qiita likes have the following tendencies. (Reference: 7 "surprise" found in the analysis of Qiita articles in the last year)
--Amazing ① The average value of "likes" is 8.02. More than half is 0-2 --Amazing ② The distribution of "likes" is so biased that it cannot be graphed.
Therefore, it is thought that likes updates are also biased toward specific articles. It seems that it is not worth it to take the difference one by one. Therefore, I try to get only the number of likes at once by getting the list, narrow down only the articles whose number of likes has changed, and hit the API multiple times to get the user ID that I liked.
We only look at the total number of likes per article, which reduces rigor in the event of a like cancellation, but at the expense of that.
Lambda + Dynamo DB Stream This time, basically, it is enough to run the process regularly (once every 15 minutes, etc.). With a regular Web Server, you're just wasting most of your time booting. If it is a common pay-as-you-go service, it will be lost. However, Lambda charges only for the compute time you actually use, and you don't charge when your code isn't running.
Due to the nature of using resources as much as necessary, various triggers for processing execution can be selected. The following triggers are suitable for this requirement.
--CloudWatch Events: Periodic execution --Dynamo DB Stream: When the DB is changed, it receives the changed data and executes the process.
LINE Notify
You can notify LINE just by putting the access token in the header and POSTing the message. Also, getting an access token is very easy.
The implementation procedure is as follows. We will also reprint the block diagram to understand the role of each implementation.
I would like to introduce an excerpt of the code used in Lambda. You can see the code you are actually using from here.
I will omit it because it deviates from the main subject.
The following is very helpful and recommended. The content of this article is sufficient if you can hold down to the section "Testing on Lambda". (Reference: First API development using Lambda and DynamoDB)
In Python, you want to use Requests, but in Lambda you can't use pip install, so it's a hassle to use anything other than built-in functions. (If you still want to use it, here)
So, first, prepare a function for get and post request with urllib. The interface is as close to Requests as possible. The req_get and req_post functions take the same arguments as the requests.get and requests.post functions. Also, the Response object can get the contents of the json response with .body
.
import json
from urllib.request import Request
from urllib import request, parse, error
from http.client import HTTPResponse
class Response():
"""Http Response Object"""
def __init__(self, res: HTTPResponse):
self.body = self._json(res)
self.status_code = self._status_code(res)
self.headers = self._headers(res)
def _json(self, res: HTTPResponse):
return json.loads(res.read())
def _status_code(self, res: HTTPResponse) -> int:
return res.status
def _headers(self, res: HTTPResponse) -> Dict[str, str]:
return dict(res.getheaders())
def req_get(url: str, headers=None, params=None) -> Response:
"""get request. simplified request function of Requests
:return: Response object
"""
if params:
url = '{}?{}'.format(url, parse.urlencode(params))
req = Request(url, headers=headers, method='GET')
with request.urlopen(req) as res:
response = Response(res)
return response
def req_post(url: str, data: Dict[str, Any], headers=None) -> Response:
"""post request. simplified request function of Requests
:return: Response object
"""
if headers.get('Content-Type') == 'application/x-www-form-urlencoded':
encoded_data = parse.urlencode(data).encode()
else:
encoded_data = json.dumps(data).encode()
req = Request(url, data=encoded_data, headers=headers, method='POST')
with request.urlopen(req) as res:
response = Response(res)
return response
Documentation and [Support](https://help.qiita.com/en/articles/ (qiita-search-options) and hit `GET / api / v2 / authenticated_user / items```. Here, I am using the function
`serialize_response``` that discards unnecessary values (only the ID and title and the number of likes are required). Also, if you have a large number of articles, pagination is required. Therefore, since the header contains the total number of articles of the user, the maximum number of pagination is calculated by the first get, and the get is repeated.
def serialize_response(response: Response) -> List[Dict[str, Any]]:
"""serialize response of Qiita API v2"""
keys = ['id', 'title', 'likes_count']
return [
{f: resp.get(f) for f in keys} for resp in response.body
]
def get_item(url: str, headers: Dict[str, str], **param) -> List[Dict[str, Any]]:
"""get a item by Qiita API v2 and return the list of serialized response (dictionary)"""
response = req_get(url, headers=headers, params=param)
return serialize_response(response)
def get_items(token: str, per_page=1, url='https://qiita.com/api/v2/authenticated_user/items') -> List[Dict[str, Any]]:
"""Pagination to get all articles of authenticated users"""
headers = {'Authorization': 'Bearer {}'.format(token)}
response: Response = req_get(url, headers=headers, params={'page': 1, 'per_page': per_page})
items = serialize_response(response)
tot_count = int(response.headers['Total-Count'])
tot_pages = ceil(tot_count / per_page)
if tot_pages <= 1:
return items
for page in range(2, tot_pages + 1):
items += get_item(url, headers, page=page, per_page=per_page)
return items
If you press Dynamo DB Table Overview / Stream Details / Stream Management, you will see the following. If set, stream data (data before and after the change) will be streamed when Dynamo DB is updated. (In 5., I will trigger Lambda with this stream data)
Update Dynamo DB with the following function. If the ID is not in Dynamo DB, it is newly created, if the ID exists and the number of likes (iine) is changed, it is updated, otherwise there is no change. Only newly created and updated items will be stream data.
import boto3
from botocore.exceptions import ClientError
def update_logs(items: List[Dict[str, Any]]):
"""Update the number of iine in Dynamo DB
If item ID do not exist in Dynamo DB, insert them in it
"""
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('iine_qiita_logs')
for item in items:
ids = item.get('id')
title = item.get('title')
iine = item.get('likes_count')
try:
response = table.update_item(
Key={'ids': ids},
UpdateExpression="set iine = :newiine, title = :title",
ConditionExpression="attribute_not_exists(ids) or iine <> :newiine",
ExpressionAttributeValues={
":newiine": iine,
":title": title
},
)
except ClientError as e:
if e.response['Error']['Code'] == "ConditionalCheckFailedException":
print(e.response['Error']['Message'])
else:
raise
Put 2 to 3 codes together on Lambda. Then, trigger "CloudWatch Events". (Although it is obviously too frequent because of Zako: cry :) From 9AM to 1AM, processing is done every 15 minutes.
Then, the following Dynamo DB items will be updated periodically and stream data will be streamed.
Build a Lambda on the notification side. Since the stream data of the update has started to flow up to 4., Lambda that receives the stream data and executes the process is required. Just set the trigger to Dynamo DB as below.
Stream data can be obtained from the first argument of the handler specified in Lambda as follows. (Reference: Run Lambda triggered by DynamoDB Stream)
def serialize_record(record: Dict[str, Any]) -> Dict[str, Any]:
"""serialize data of Dynamo DB Stream"""
if record.get('eventName') != 'MODIFY':
return {}
past = record.get('dynamodb', {}).get('OldImage')
past_iine = int(past.get('iine', {}).get('N', 0))
ids = past.get('ids', {}).get('S', '')
new = record.get('dynamodb', {}).get('NewImage')
title = new.get('title', {}).get('S', '')
new_iine = int(new.get('iine', {}).get('N', 0))
return {
'ids': ids,
'title': title,
'new_iine': new_iine,
'past_iine': past_iine
}
def lambda_handler(event, context):
"""main handler for Lambda"""
records = event.get('Records', [])
for record in records:
serialized_data = serialize_record(record)
...
By now, the ID of the article whose number of likes has increased has been obtained, so the user ID of the likes is obtained from `` `GET / api / v2 / items /: item_id / likes``` of Qiita API v2. To do.
def serialize_response_name(response: Response, new_size: int, num: int, title: str) -> Dict[str, Any]:
"""serialize iine data of Qiita API v2"""
size = new_size - num
if size <= 0:
users: List[str] = []
else:
new_iine = response.body[:size]
users = [
resp.get('user', {}).get('id') for resp in new_iine
]
return {
'title': title,
'users': users
}
def get_new_iine(item: Dict[str, Any], token: str) -> Dict[str, Any]:
"""HTTP request to Qiita API v2"""
headers = {'Authorization': 'Bearer {}'.format(token)}
ids = item.get('ids', '')
past_iine = item.get('past_iine', 0)
new_iine = item.get('new_iine', 0)
url = f'https://qiita.com/api/v2/items/{ids}/likes'
response = req_get(url, headers=headers)
title: str = item.get('title', '')
resp = serialize_response_name(response, new_iine, past_iine, title)
return resp
You can get an access token by logging in, pressing Issue Access Talk from My Page, pressing "Receive notifications from LINE Notify 1: 1", and then "Issue".
All you have to do is format it appropriately and post it.
def deserialize_response_name(response: Dict[str, Any], max_length=20) -> str:
"""deserialize text for LINE Notify
:param max_length: max sentence length
"""
names = ", ".join(response.get('users', []))
title = response.get('title', '')
title = f"{title}" if len(title) <= max_length else f"{title[:max_length]}..."
return f"\n{names}But"{title}I liked it."
def send_notification(message: str, token: str):
"""send notification by LINE notify"""
url = 'https://notify-api.line.me/api/notify'
headers = {
'Authorization': 'Bearer {}'.format(token),
'Content-Type': 'application/x-www-form-urlencoded'
}
msg = {'message': message}
response = req_post(url, data=msg, headers=headers)
return response.body
That's all for the purpose of this article. After that, if you set the following function in the handler, the notification will run.
def lambda_handler(event, context):
"""main handler for Lambda"""
qiita_token = os.environ["QIITA_TOKEN"]
line_token = os.environ["LINE_TOKEN"]
records = event.get('Records', [])
for record in records:
serialized_data = serialize_record(record)
if not serialized_data:
continue
new_iines = get_new_iine(serialized_data, qiita_token)
if len(new_iines.get('users')) == 0:
continue
send_notification(deserialize_response_name(new_iines), line_token)
return {
'statusCode': 200,
}
Notification example:
You can now safely receive LINE notifications. I also felt that it was a good theme for getting started with event-driven application development using AWS. I am grateful to the author of the original story for reference. .. ..
Thank you for reading for me until the end! I hope it will be helpful for you! Refs -We built a gentle world that will notify you LINE when you like Qiita -Qiita API v2 documentation