Auparavant, j'ai écrit un article API obtenue en convertissant le catalogue de données ouvertes de la préfecture de Shizuoka (csv) en données (json) du site de contre-mesures COVID-19. C'était. L'API créée ici est interrogée avec Lambda et, en cas de modification des données, elle est stockée dans S3, les informations sont également enregistrées dans DynamoDB et Slack est informé de la modification. C'était. C'est la prétention ETL. En plus de la ville de Hamamatsu, les données de la ville de Shizuoka ont également été surveillées.
Le code de fonction de Lambda (Python) est disponible dans ce référentiel. https://github.com/ww2or3ww/covid19_shizuoka-opendata_notifier
Ajoutez un déclencheur. Réglez la gâchette. Sélectionnez EventBridge (ClowdWatch Event) et créez une règle. Veuillez expliquer comment rédiger la formule de planification. Seulement ça.
Lors de la publication du code source Python sur GitHub, etc., définissez les informations que vous ne souhaitez pas publier dans la variable d'environnement.
Sélectionnez Modifier "Variables d'environnement" sous le code de fonction de Lambda. Ajoutez les clés et les valeurs des variables d'environnement. Sera ajouté.
Lorsque vous l'obtenez à partir du code Python, cela ressemble à ceci:
api_address = os.environ["API_ADDRESS_CSV2JSON"]
Appelez cette API pour obtenir les données JSON. Le code ressemble à ceci.
import json
import requests
class CityInfo:
def __init__(self, city, queryParam):
self.city = city
self.queryParam = queryParam
CityInfo(
"hamamatsu",
"main_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,inspection_persons:d4827176-d887-412a-9344-f84f161786a2,contacts:1b57f2c0-081e-4664-ba28-9cce56d0b314"
)
apiResponse = requests.get("{0}?type={1}".format(API_ADDRESS_CSV2JSON, queryParam), auth=AUTH, headers={"x-api-key": API_KEY_CSV2JSON})
retJson = json.loads(apiResponse.text)
Les spécifications du tableau sont comme ça.
city(PK) | Type de ville(hamamatsu/shizuoka-shi) |
type(SK) | type de données(Types tels que le nombre de patients positifs et le nombre de personnes testées) |
id | ID du catalogue de données ouvertes de la préfecture de Shizuoka |
name | nom du type |
path | Chemin S3 où les dernières données sont stockées(Clé) |
update | La date et l'heure de la dernière mise à jour des données |
Étant donné que chaque donnée JSON a une date de dernière mise à jour, comparez cette date et cette heure avec la mise à jour DynamoDB et mettez à jour l'enregistrement s'il a été mis à jour. (Insérer pour les nouvelles données.)
import boto3
from boto3.dynamodb.conditions import Key
DYNAMODB_NAME = os.environ["DYNAMODB_NAME"]
DYNAMO_TABLE = boto3.resource("dynamodb").Table(DYNAMODB_NAME)
def processType(cityInfo, retJson, typeId):
date = retJson[type]["date"]
listItem = typeId.split(":")
type = listItem[0]
id = listItem[1]
record = selectItem(cityInfo.city, type)
if(record["Count"] is 0):
path = uploadFile(cityInfo.city, type, id, date, retJson[type])
insertItem(cityInfo.city, type, id, date, TYPE_NAME[type], path)
elif record["Items"][0]["update"] != date:
path = uploadFile(cityInfo.city, type, id, date, retJson[type])
updateItem(cityInfo.city, type, id, date, path)
def selectItem(city, type):
return DYNAMO_TABLE.query(
KeyConditionExpression=Key("city").eq(city) & Key("type").eq(type)
)
def insertItem(city, type, id, date, name, path):
DYNAMO_TABLE.put_item(
Item = {
"city": city,
"type": type,
"id": id,
"update" : date,
"name" : name,
"path" : path
}
)
def updateItem(city, type, id, date, path):
DYNAMO_TABLE.update_item(
Key={
"city": city,
"type": type,
},
UpdateExpression="set #update = :update, #path = :path",
ExpressionAttributeNames={
"#update": "update",
"#path": "path"
},
ExpressionAttributeValues={
":update": date,
":path": path
}
)
J'ai essayé de stocker non seulement les dernières informations, mais aussi les informations d'historique dans un tableau séparé, mais je vais l'omettre.
Les données mises à jour seront téléchargées vers S3.
import boto3
S3 = boto3.resource('s3')
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]
def uploadFile(city, type, id, date, jsonData):
dt = datetime.strptime(date, '%Y/%m/%d %H:%M')
path = "data/{0}/{1}/{2}/{3}/{4}/{5}-{6}".format(city, type, dt.year, str(dt.month).zfill(2), str(dt.day).zfill(2), str(dt.hour).zfill(2), str(dt.minute).zfill(2))
objJson = S3.Object(S3_BUCKET_NAME, "{0}.json".format(path))
objJson.put(Body = json.dumps(jsonData, ensure_ascii=False, indent=2))
return path
Non seulement JSON mais aussi CSV des données d'origine sont acquis et enregistrés en tant qu'ensemble avec JSON, mais je vais l'omettre.
Il est accumulé dans S3 comme ça. En ce qui concerne le partitionnement, il aurait été préférable que CSV et JSON ne soient pas au même endroit, et quand je le regarde à nouveau, cela semble un peu subtil.
Nous informerons Slack de toute mise à jour. Le code Python ressemble à ceci:
import slackweb
SLACK_WEBHOOK_HAMAMATSU = os.environ["SLACK_WEBHOOK_HAMAMATSU"]
notifyText = "【{0}】\n{1}".format(cityInfo.city, notifyText)
notifyToSlack(SLACK_WEBHOOK_HAMAMATSU, notifyText)
def notifyToSlack(url, text):
slack = slackweb.Slack(url=url)
slack.notify(text=text)
Comment faire passer l'URL du WebHook à slackweb? Accédez à Incoming WebHooks. Vous pouvez obtenir l'URL du Webhook en sélectionnant le canal que vous souhaitez notifier et en cliquant sur le bouton "Ajouter l'intégration des WebHooks entrants". Vous pouvez également modifier l'icône et le nom ici.
Voici comment Slack est notifié.
J'ai remarqué que deux mois se sont écoulés depuis le Dernier message (15/04), donc j'écris un article à la hâte. C'est à peine sûr aujourd'hui le 14/06. Cet appareil lui-même a été achevé vers 4/20, et je l'utilise personnellement depuis.
En fait, j'ai commencé à créer ce mécanisme, dans l'espoir de notifier le canal Slack de code4hamamatsu et d'informer tout le monde de la mise à jour des données. À partir de la première quinzaine d'avril.
Cependant, j'ai un environnement dans lequel GitHub Actions interroge et valide toutes les modifications, puis la compilation Netlify s'exécute et le résultat est notifié au canal Slack, j'ai donc la possibilité de l'utiliser. Il n'y avait pas.
Pendant que je le faisais, j'étais un peu nerveux parce que j'ai dit: «Oh, je n'ai pas besoin de ça.
Préparé pour faire du matériel LT au JAWS-UG Hamamatsu AWS Study Group 2020 # 4 à la fin du mois d'avril afin que tout le monde sache au moins son existence. Je l'ai également mis à l'ordre du jour, mais je ne l'ai pas fait en un rien de temps.
Je pensais essayer d'écrire sur l'article de Qiita, mais il semble que cela fait deux mois depuis. Ah! Pauvre enfant! !!