(Une addition) C'était tout simplement gratuit, je le publierai donc le 9e jour du Calendrier de l'avent AWS Lambda et Serverless # 2 2019.
J'ai soudain pensé que je voudrais la notification LINE suivante des likes de Qiita,
Découvrez les articles liés! -> Nous avons construit un monde doux qui vous avertira LINE quand vous l'aimerez avec Qiita
La structure est simple (et l'article lui-même) est très utile.
Cette fois, je voulais créer une application événementielle à l'aide de services AWS tels que Lambda, j'ai donc modifié un peu la configuration et je l'ai imitée.
L'article original est implémenté avec la structure simple suivante.
Cette fois, nous avons reconsidéré la configuration comme suit afin que le traitement de Lambda puisse être effectué de manière plus concise et plus rapide.
--Scraping-> Passer à l'agrégation à l'aide de l'API Qiita --Éliminer le sommeil pour réduire le temps de traitement
Étant donné que j'utilise Lambda et Dynamo DB pour la première fois cette fois, je sens fermement que je veux essayer diverses fonctions. Je ne peux donc pas nier le sentiment d'en faire trop. Je pense que l'article original est plus intelligent.
J'ai utilisé ce qui suit:
Qiita API v2 L'API est disponible pour Qiita (documentation de l'API Qiita v2). Vous pouvez facilement obtenir diverses informations. Cependant, malheureusement, aucune API ne peut recevoir de notifications, nous réaliserons donc des notifications en combinant les éléments suivants.
une fonction | point final | avantage | Désavantage |
---|---|---|---|
Obtenez une liste de j'aime | GET /api/v2/authenticated_user/items | Vous pouvez obtenir le nombre total de likes jusqu'à 100 articles avec un seul get | La mémoire est consommée car la réponse contient le corps |
Obtenez l'ID utilisateur que vous avez aimé | GET /api/v2/items/:item_id/likes | Vous pouvez obtenir des utilisateurs qui aiment chaque article | Vous devez obtenir autant d'articles que nécessaire |
Lambda ne facture que le temps d'utilisation, la réduction du temps de traitement est donc prioritaire par rapport aux notifications strictes. Je pense que c'est le plus strict d'obtenir tous les identifiants utilisateur que vous aimez régulièrement et de faire la différence, mais comme l'API Qiita v2 ne peut obtenir que les identifiants utilisateur que vous aimez pour chaque article, API autant que le nombre d'articles Vous devrez frapper. Qiita aime les tendances suivantes. (Référence: 7 "étonnants" trouvés dans l'analyse des articles Qiita de l'année dernière)
- Incroyable ① La valeur moyenne des "j'aime" est de 8,02. Plus de la moitié est 0-2
- Incroyable ② La distribution des "likes" est tellement biaisée qu'elle ne peut pas être représentée graphiquement.
Par conséquent, on pense que la mise à jour des likes est également biaisée vers un article spécifique. Il semble que cela ne vaille pas la peine de prendre la différence une par une. Par conséquent, j'essaie d'obtenir uniquement le nombre de likes à la fois en obtenant la liste, de limiter uniquement les articles dont le nombre de likes a changé et de frapper l'API plusieurs fois pour obtenir l'ID utilisateur que j'ai aimé.
Nous ne regardons que le nombre total de likes par article, donc s'il y a une annulation d'un like, ce sera moins rigoureux, mais au détriment de cela.
Lambda + Dynamo DB Stream Cette fois, en gros, il suffit d'exécuter le processus régulièrement (une fois toutes les 15 minutes, etc.). Avec un serveur Web standard, vous perdez la plupart de votre temps à démarrer. S'il s'agit d'un service de paiement à l'utilisation courant, il sera perdu. Cependant, Lambda ne facture que le temps de calcul réel utilisé et n'engage pas de frais lorsque le code n'est pas en cours d'exécution.
En raison de la nature de l'utilisation des ressources autant que nécessaire, vous pouvez sélectionner divers déclencheurs pour l'exécution du traitement. Les déclencheurs suivants conviennent à cette exigence.
--CloudWatch Events: exécution périodique --Dynamo DB Stream: lorsque le DB est modifié, il reçoit les données modifiées et exécute le processus.
LINE Notify
Vous pouvez notifier LINE simplement en mettant le jeton d'accès dans l'en-tête et en POSTANT le message. Il est également très facile d'obtenir un jeton d'accès.
La procédure de mise en œuvre est la suivante. Nous réimprimerons également le diagramme pour comprendre le rôle de chaque implémentation.
Je voudrais présenter un extrait du code utilisé dans Lambda. Vous pouvez voir le code que vous utilisez réellement depuis ici.
Je vais l'omettre car il s'écarte du sujet principal.
Ce qui suit est très utile et recommandé. Le contenu de cet article est suffisant si vous pouvez vous en tenir à la section «Test sur Lambda». (Référence: Premier développement d'API utilisant Lambda et DynamoDB)
En Python, vous souhaitez utiliser Requests, mais dans Lambda vous ne pouvez pas utiliser pip install, donc essayer d'utiliser autre chose que des fonctions intégrées est un problème. (Si vous souhaitez toujours l'utiliser, ici) Donc, tout d'abord, préparez une fonction pour obtenir et publier une requête avec urllib. L'interface est aussi proche que possible des demandes. Les fonctions req_get et req_post prennent les mêmes arguments que les fonctions requests.get et requests.post. En outre, l'objet Response peut obtenir le contenu de la réponse json avec `` .body```.
import json
from urllib.request import Request
from urllib import request, parse, error
from http.client import HTTPResponse
class Response():
"""Http Response Object"""
def __init__(self, res: HTTPResponse):
self.body = self._json(res)
self.status_code = self._status_code(res)
self.headers = self._headers(res)
def _json(self, res: HTTPResponse):
return json.loads(res.read())
def _status_code(self, res: HTTPResponse) -> int:
return res.status
def _headers(self, res: HTTPResponse) -> Dict[str, str]:
return dict(res.getheaders())
def req_get(url: str, headers=None, params=None) -> Response:
"""get request. simplified request function of Requests
:return: Response object
"""
if params:
url = '{}?{}'.format(url, parse.urlencode(params))
req = Request(url, headers=headers, method='GET')
with request.urlopen(req) as res:
response = Response(res)
return response
def req_post(url: str, data: Dict[str, Any], headers=None) -> Response:
"""post request. simplified request function of Requests
:return: Response object
"""
if headers.get('Content-Type') == 'application/x-www-form-urlencoded':
encoded_data = parse.urlencode(data).encode()
else:
encoded_data = json.dumps(data).encode()
req = Request(url, data=encoded_data, headers=headers, method='POST')
with request.urlopen(req) as res:
response = Response(res)
return response
Documentation et [Assistance](https://help.qiita.com/en/articles/ (qiita-search-options) et appuyez sur GET / api / v2 / authenticated_user / items ''. Ici, j'utilise la fonction
serialize_response '' qui supprime les valeurs inutiles (seuls l'ID et le titre et le nombre de likes sont requis). De plus, si vous avez un grand nombre d'articles, vous aurez besoin de nations de page. Par conséquent, puisque le nombre total d'articles de l'utilisateur est inclus dans l'en-tête, le nombre maximum de nations de page est calculé par le premier get, et le get est répété.
def serialize_response(response: Response) -> List[Dict[str, Any]]:
"""serialize response of Qiita API v2"""
keys = ['id', 'title', 'likes_count']
return [
{f: resp.get(f) for f in keys} for resp in response.body
]
def get_item(url: str, headers: Dict[str, str], **param) -> List[Dict[str, Any]]:
"""get a item by Qiita API v2 and return the list of serialized response (dictionary)"""
response = req_get(url, headers=headers, params=param)
return serialize_response(response)
def get_items(token: str, per_page=1, url='https://qiita.com/api/v2/authenticated_user/items') -> List[Dict[str, Any]]:
"""Pagenate pour obtenir tous les articles des utilisateurs authentifiés"""
headers = {'Authorization': 'Bearer {}'.format(token)}
response: Response = req_get(url, headers=headers, params={'page': 1, 'per_page': per_page})
items = serialize_response(response)
tot_count = int(response.headers['Total-Count'])
tot_pages = ceil(tot_count / per_page)
if tot_pages <= 1:
return items
for page in range(2, tot_pages + 1):
items += get_item(url, headers, page=page, per_page=per_page)
return items
Appuyez sur Présentation de la table Dynamo DB / Détails du flux / Gérer les flux et vous devriez voir quelque chose comme ceci: S'il est défini, les données de flux (données avant et après la modification) seront diffusées lors de la mise à jour de Dynamo DB. (Dans 5., ces données de flux sont utilisées comme déclencheur pour permettre l'exécution de Lambda.)
Mettez à jour Dynamo DB avec la fonction suivante. Si l'ID n'est pas dans Dynamo DB, il est nouvellement créé, si l'ID existe et que le nombre de likes (iine) est modifié, il est mis à jour, sinon il n'y a pas de changement. Seuls les éléments nouvellement créés et mis à jour seront des données de flux.
import boto3
from botocore.exceptions import ClientError
def update_logs(items: List[Dict[str, Any]]):
"""Update the number of iine in Dynamo DB
If item ID do not exist in Dynamo DB, insert them in it
"""
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('iine_qiita_logs')
for item in items:
ids = item.get('id')
title = item.get('title')
iine = item.get('likes_count')
try:
response = table.update_item(
Key={'ids': ids},
UpdateExpression="set iine = :newiine, title = :title",
ConditionExpression="attribute_not_exists(ids) or iine <> :newiine",
ExpressionAttributeValues={
":newiine": iine,
":title": title
},
)
except ClientError as e:
if e.response['Error']['Code'] == "ConditionalCheckFailedException":
print(e.response['Error']['Message'])
else:
raise
Mettez 2 à 3 codes ensemble sur Lambda. Ensuite, déclenchez "Cloud Watch Events". (Même si c'est évidemment trop fréquent à cause de Zako: cry :) De 9h à 1h, le traitement se fait toutes les 15 minutes.
Ensuite, les éléments Dynamo DB suivants seront mis à jour périodiquement et les données de flux seront diffusées en continu.
Construisez Lambda du côté des notifications. Étant donné que les données de flux mises à jour ont commencé à s'écouler jusqu'à 4., Lambda qui reçoit les données de flux et exécute le processus est requis. Réglez simplement le déclencheur sur Dynamo DB comme indiqué ci-dessous.
Les données de flux peuvent être obtenues à partir du premier argument du gestionnaire spécifié dans Lambda comme suit. (Référence: Exécuter Lambda déclenché par DynamoDB Stream)
def serialize_record(record: Dict[str, Any]) -> Dict[str, Any]:
"""serialize data of Dynamo DB Stream"""
if record.get('eventName') != 'MODIFY':
return {}
past = record.get('dynamodb', {}).get('OldImage')
past_iine = int(past.get('iine', {}).get('N', 0))
ids = past.get('ids', {}).get('S', '')
new = record.get('dynamodb', {}).get('NewImage')
title = new.get('title', {}).get('S', '')
new_iine = int(new.get('iine', {}).get('N', 0))
return {
'ids': ids,
'title': title,
'new_iine': new_iine,
'past_iine': past_iine
}
def lambda_handler(event, context):
"""main handler for Lambda"""
records = event.get('Records', [])
for record in records:
serialized_data = serialize_record(record)
...
Maintenant que vous avez l'ID de l'article dont le nombre de likes a augmenté, récupérez l'ID utilisateur que vous avez aimé dans `` GET / api / v2 / items /: item_id / likes '' de l'API Qiita v2. Faire.
def serialize_response_name(response: Response, new_size: int, num: int, title: str) -> Dict[str, Any]:
"""serialize iine data of Qiita API v2"""
size = new_size - num
if size <= 0:
users: List[str] = []
else:
new_iine = response.body[:size]
users = [
resp.get('user', {}).get('id') for resp in new_iine
]
return {
'title': title,
'users': users
}
def get_new_iine(item: Dict[str, Any], token: str) -> Dict[str, Any]:
"""HTTP request to Qiita API v2"""
headers = {'Authorization': 'Bearer {}'.format(token)}
ids = item.get('ids', '')
past_iine = item.get('past_iine', 0)
new_iine = item.get('new_iine', 0)
url = f'https://qiita.com/api/v2/items/{ids}/likes'
response = req_get(url, headers=headers)
title: str = item.get('title', '')
resp = serialize_response_name(response, new_iine, past_iine, title)
return resp
Vous pouvez obtenir un jeton d'accès en vous connectant, en appuyant sur Émettre Accès Talk depuis Ma page, en appuyant sur «Recevoir des notifications de LINE Notify 1: 1», puis sur «Émettre».
Tout ce que vous avez à faire est de le formater de manière appropriée et de le publier.
def deserialize_response_name(response: Dict[str, Any], max_length=20) -> str:
"""deserialize text for LINE Notify
:param max_length: max sentence length
"""
names = ", ".join(response.get('users', []))
title = response.get('title', '')
title = f"{title}" if len(title) <= max_length else f"{title[:max_length]}..."
return f"\n{names}Mais"{title}Je l'ai aimé."
def send_notification(message: str, token: str):
"""send notification by LINE notify"""
url = 'https://notify-api.line.me/api/notify'
headers = {
'Authorization': 'Bearer {}'.format(token),
'Content-Type': 'application/x-www-form-urlencoded'
}
msg = {'message': message}
response = req_post(url, data=msg, headers=headers)
return response.body
C'est tout pour les besoins de cet article. Après cela, si vous définissez la fonction suivante dans le gestionnaire, la notification s'exécutera.
def lambda_handler(event, context):
"""main handler for Lambda"""
qiita_token = os.environ["QIITA_TOKEN"]
line_token = os.environ["LINE_TOKEN"]
records = event.get('Records', [])
for record in records:
serialized_data = serialize_record(record)
if not serialized_data:
continue
new_iines = get_new_iine(serialized_data, qiita_token)
if len(new_iines.get('users')) == 0:
continue
send_notification(deserialize_response_name(new_iines), line_token)
return {
'statusCode': 200,
}
Exemple de notification:
Vous pouvez désormais recevoir des notifications LINE en toute sécurité. J'ai également estimé que c'était un bon thème pour démarrer avec le développement d'applications événementielles à l'aide d'AWS. Je remercie l'auteur de l'histoire originale pour référence. .. ..
Merci d'avoir lu pour moi jusqu'à la fin! J'espère que cela vous sera utile! Refs