Previously, I wrote an article API to convert from Shizuoka open data catalog (csv) to data (json) of COVID-19 countermeasure site. It was. I made a mechanism such as polling the API created here with Lambda, accumulating data in S3 if there is a change, saving the information in DynamoDB, and notifying Slack that there is a change. It was. It's an ETL pretend. In addition to Hamamatsu City, data from Shizuoka City was also monitored.
Lambda function code (Python) is available in this repository. https://github.com/ww2or3ww/covid19_shizuoka-opendata_notifier
Add a trigger. Set the trigger. Select EventBridge (ClowdWatch Event) and create a rule. Please go through how to write the schedule formula. Only this.
When publishing the Python source code to GitHub etc., set the information you do not want to publish in the environment variable.
Select Edit "Environment Variables" under the Lambda function code. Add the keys and values of environment variables. Will be added.
When you get it from Python code, it looks like this:
api_address = os.environ["API_ADDRESS_CSV2JSON"]
Call this API to get the JSON data. The code looks like this.
import json
import requests
class CityInfo:
def __init__(self, city, queryParam):
self.city = city
self.queryParam = queryParam
CityInfo(
"hamamatsu",
"main_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,inspection_persons:d4827176-d887-412a-9344-f84f161786a2,contacts:1b57f2c0-081e-4664-ba28-9cce56d0b314"
)
apiResponse = requests.get("{0}?type={1}".format(API_ADDRESS_CSV2JSON, queryParam), auth=AUTH, headers={"x-api-key": API_KEY_CSV2JSON})
retJson = json.loads(apiResponse.text)
The table specifications are like this.
city(PK) | City type(hamamatsu/shizuoka-shi) |
type(SK) | type of data(Types such as the number of positive patients and the number of people tested) |
id | Shizuoka Open Data Catalog ID |
name | type name |
path | S3 path where the latest data is stored(Key) |
update | The date and time when the data was last updated |
Since each JSON data has a last update date, compare that date and time with the DynamoDB update, and if it is updated, update the record as well. (Insert for new data.)
import boto3
from boto3.dynamodb.conditions import Key
DYNAMODB_NAME = os.environ["DYNAMODB_NAME"]
DYNAMO_TABLE = boto3.resource("dynamodb").Table(DYNAMODB_NAME)
def processType(cityInfo, retJson, typeId):
date = retJson[type]["date"]
listItem = typeId.split(":")
type = listItem[0]
id = listItem[1]
record = selectItem(cityInfo.city, type)
if(record["Count"] is 0):
path = uploadFile(cityInfo.city, type, id, date, retJson[type])
insertItem(cityInfo.city, type, id, date, TYPE_NAME[type], path)
elif record["Items"][0]["update"] != date:
path = uploadFile(cityInfo.city, type, id, date, retJson[type])
updateItem(cityInfo.city, type, id, date, path)
def selectItem(city, type):
return DYNAMO_TABLE.query(
KeyConditionExpression=Key("city").eq(city) & Key("type").eq(type)
)
def insertItem(city, type, id, date, name, path):
DYNAMO_TABLE.put_item(
Item = {
"city": city,
"type": type,
"id": id,
"update" : date,
"name" : name,
"path" : path
}
)
def updateItem(city, type, id, date, path):
DYNAMO_TABLE.update_item(
Key={
"city": city,
"type": type,
},
UpdateExpression="set #update = :update, #path = :path",
ExpressionAttributeNames={
"#update": "update",
"#path": "path"
},
ExpressionAttributeValues={
":update": date,
":path": path
}
)
I tried to store not only the latest information but also history information in a separate table, but I will omit that.
The updated data will be uploaded to S3.
import boto3
S3 = boto3.resource('s3')
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]
def uploadFile(city, type, id, date, jsonData):
dt = datetime.strptime(date, '%Y/%m/%d %H:%M')
path = "data/{0}/{1}/{2}/{3}/{4}/{5}-{6}".format(city, type, dt.year, str(dt.month).zfill(2), str(dt.day).zfill(2), str(dt.hour).zfill(2), str(dt.minute).zfill(2))
objJson = S3.Object(S3_BUCKET_NAME, "{0}.json".format(path))
objJson.put(Body = json.dumps(jsonData, ensure_ascii=False, indent=2))
return path
Not only JSON but also CSV of the original data is acquired and saved as a set with JSON, but I will omit it.
It is accumulated in S3 like this. Regarding partitioning, it would have been better if CSV and JSON were not in the same place, and when I look at it again, it seems a bit subtle.
Notify Slack of any updates. The Python code looks like this:
import slackweb
SLACK_WEBHOOK_HAMAMATSU = os.environ["SLACK_WEBHOOK_HAMAMATSU"]
notifyText = "【{0}】\n{1}".format(cityInfo.city, notifyText)
notifyToSlack(SLACK_WEBHOOK_HAMAMATSU, notifyText)
def notifyToSlack(url, text):
slack = slackweb.Slack(url=url)
slack.notify(text=text)
How do I get the WebHook URL to pass to slackweb? Go to Incoming WebHooks. You can get the Webhook URL by selecting the channel you want to notify and pressing the "Add Incoming WebHooks integration" button. You can also change the icon and name here.
Here's how Slack is notified.
I noticed that it's been about 2 months since Last post (4/15), so I'm writing an article in a hurry. It's just barely safe today on 6/14. This device itself was completed around 4/20, and I have been operating it personally since then.
Actually, I started making this mechanism, hoping to notify the Slack channel of code4hamamatsu and let everyone know about the data update. From the first half of April.
However, I have an environment in which GitHub Actions polls and commits any changes, then the Netlify build runs and the result is notified to the Slack channel, so I have an opportunity to use it. There was not.
While I was making it, I was a little nervous because I said "Oh, I don't need this", but I made it to the point where it works while lowering the priority, and I feel like I'm running it personally.
At the end of April JAWS-UG Hamamatsu AWS Study Group 2020 # 4, I prepared to use it as a material for LT so that everyone would know about its existence. I also posted it on the agenda, but I didn't do it in no time.
I was thinking of trying to write about Qiita's article, but it seems like two months have passed and it continues to this day. Ah! Poor child! !!