import datetime
import pathlib
import re
from urllib.parse import urljoin
import pandas as pd
import pdfplumber
import requests
from bs4 import BeautifulSoup
def fetch_file(url, dir="."):
r = requests.get(url)
r.raise_for_status()
p = pathlib.Path(dir, pathlib.PurePath(url).name)
p.parent.mkdir(parents=True, exist_ok=True)
with p.open(mode="wb") as fw:
fw.write(r.content)
return p
def days2date(s):
y = dt_now.year
days = re.findall("[0-9]{1,2}", s)
if len(days) == 2:
m, d = map(int, days)
return pd.Timestamp(year=y, month=m, day=d)
else:
return pd.NaT
def wareki2date(s):
m = re.search("(Showa|Heisei|Reiwa)([ 0-9 yuan]{1,2})Year(\d{1,2})Moon(\d{1,2})Day", s)
if m:
year, month, day = [1 if i == "Former" else int(i.strip()) for i in m.group(2, 3, 4)]
if m.group(1) == "Showa":
year += 1925
elif m.group(1) == "Heisei":
year += 1988
elif m.group(1) == "Reiwa":
year += 2018
return datetime.date(year, month, day)
else:
return dt_now.date
url = "https://www.pref.aichi.jp/site/covid19-aichi/"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko"
}
JST = datetime.timezone(datetime.timedelta(hours=+9), "JST")
dt_now = datetime.datetime.now(JST)
r = requests.get(url, headers=headers)
r.raise_for_status()
soup = BeautifulSoup(r.content, "html.parser")
dfs = []
dt_text = ""
for tag in soup.find("span", text="▶ Cases of occurrence in Aichi Prefecture").parent.find_all(
"a", href=re.compile(".pdf$")
)[::-1]:
link = urljoin(url, tag.get("href"))
path_pdf = fetch_file(link)
with pdfplumber.open(path_pdf) as pdf:
for page in pdf.pages:
if page.page_number == 1:
dt_text = page.within_bbox((0, 80, page.width, 90)).extract_text()
table = page.extract_table()
df_tmp = pd.DataFrame(table[1:], columns=table[0])
dfs.append(df_tmp)
df = pd.concat(dfs).set_index("No")
df["Announcement date"] = df["Announcement date"].apply(days2date)
df.dropna(subset=["Announcement date"], inplace=True)
#Divide age and gender
df_ages = df["Age / Gender"].str.extract("(.+)(male|Female)").rename(columns={0: "Age", 1: "sex"})
df = df.join(df_ages)
dt_update = wareki2date(dt_text)
path_csv = pathlib.Path(dt_update.strftime("%Y%m%d") + ".csv")
df.to_csv(path_csv, encoding="utf_8_sig")
Recommended Posts