Un wrapper Python pour l'API Qiita pour les lots créés dans Obtenez "presque" tous les articles avec l'API Qiita.
Je n'ai pas si bien réussi, alors quand j'ai pensé à le rafraîchir, je n'ai rien fait et je l'ai laissé pendant un mois. .. .. Étant donné que l'API a également été mise à niveau, nous ne prendrons en charge que la version 2 et la publierons pour le moment.
Ceci est un exemple de code qui récupère jusqu'à 5 pages de 100 nouveaux articles et les enregistre dans un fichier. Le processus de pagination étant effectué en interne, il n'est pas nécessaire d'écrire une double boucle.
qiita2.wait_seconds = 0
for item in qiita2.items(100, 5):
print(item["title"])
qiita2.save_item(item)
L'appel de ʻitems ()
`retourne l'itérateur de l'objet json du message, vous pouvez donc l'utiliser tel quel dans une instruction for ou un tri.
Je n'ai fait que ce dont j'avais besoin, alors c'est tout.
Défini avec la variable module de qiita2.
qiita2.auth_token = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
Défini avec la variable module de qiita2.
Nom de variable | Défaut | Contenu |
---|---|---|
default_per_page | 100 | Nombre d'acquisitions par page(Par dans chaque API_Valeur par défaut lorsque la page est omise) |
default_max_page | 100 | Nombre maximum de pages(Max dans chaque API_Valeur par défaut lorsque la page est omise) |
wait_seconds | 12 | Attendez quelques secondes avant d'envoyer la demande |
retry_wait_min | 1 | Attendez de réessayer lorsqu'une erreur se produit(Minutes) |
retry_limit | 10 | Limiter le nombre de tentatives lorsqu'une erreur se produit |
API
Renvoie l'itérateur de la nouvelle liste d'articles.
#Obtenez avec le nombre d'acquisitions par défaut et le nombre maximum de pages
items = qiita2.items()
#Obtenez en précisant le nombre d'acquisitions et le nombre maximum de pages
items = qiita2.items(per_page=20, max_page=5)
Renvoie l'itérateur de la liste de balises.
items = qiita2.tags()
items = qiita2.tags(per_page, max_page)
Renvoie l'itérateur de la liste de publications de balises spécifiques.
items = qiita2.tag_items(tag_url)
items = qiita2.tag_items(tag_url, per_page, max_page)
Obtenu à partir de l'en-tête de réponse `` Total-Count ''.
len(qiita2.items())
Le nom du fichier est
data / items /
qiita2.save_item(item)
Puisqu'il s'agit d'un fichier, vous pouvez l'utiliser tel quel en copiant.
qiita2.py
import time
import codecs
import json
from logging import getLogger
import requests
from urllib.parse import urlparse, parse_qs
logger = getLogger(__name__)
URL_ITEMS = "https://qiita.com/api/v2/items"
URL_TAG_ITEMS = "https://qiita.com/api/v2/tags/%s/items"
URL_TAGS = "https://qiita.com/api/v2/tags"
HEADER_TOTAL = "Total-Count"
LINK_NEXT = "next"
LINK_LAST = "last"
default_per_page = 100
default_max_page = 100
wait_seconds = 12
retry_wait_min = 1
retry_limit = 10
auth_token = None
def items(per_page = default_per_page, max_page = default_max_page):
req = QiitaRequest(URL_ITEMS, per_page, max_page)
return QiitaIterator(req)
def tag_items(tag_url, per_page = default_per_page, max_page = default_max_page):
req = QiitaRequest(URL_TAG_ITEMS % tag_url, per_page, max_page)
return QiitaIterator(req)
def tags(per_page = default_per_page, max_page = default_max_page):
req = QiitaRequest(URL_TAGS, per_page, max_page)
return QiitaIterator(req)
class QiitaIterator:
def __init__(self, req):
self.req = req
self.items = req.request().__iter__()
def __iter__(self):
return self
def __next__(self):
if self.items == None: raise StopIteration
try:
val = self.items.__next__()
return val
except StopIteration:
if self.req.has_next():
self.items = self.req.next().__iter__()
return self.__next__()
else:
raise StopIteration
def __len__(self):
return self.req.total_count()
class QiitaRequest:
last_request_time = None
retry_num = 0
def __init__(self, url, per_page = default_per_page, max_page = default_max_page, page = 1):
self.url = url
self.per_page = per_page
self.max_page = max_page
self.page = page
self.res = None
self.current_page = None
def request(self):
self.links = dict()
params = {"per_page": self.per_page, "page": self.page}
return self.__request__(self.url, params)
def __request__(self, url, params = None):
self.__wait__()
logger.info("url:%s" % url)
headers = {"Authorization": "Bearer " + auth_token} if auth_token != None else None
self.res = requests.get(url, params = params, headers = headers)
status = self.res.status_code
while status != 200 and QiitaRequest.retry_num <= retry_limit:
logger.warning("status:%d" % status)
logger.warn(u"%Attendez d minutes." % retry_wait_min)
time.sleep(retry_wait_min * 60)
QiitaRequest.retry_num = QiitaRequest.retry_num + 1
self.res = requests.get(url, params = params)
status = self.res.status_code
if status != 200:
logger.warning("status:%d" % status)
logger.warning(self.res.text)
return None
QiitaRequest.retry_num = 0
return self.res.json()
def next(self):
if not self.has_next(): raise Exception()
#Par en-tête de réponse Link dans la v2_Résoudre un bug où la page est manquante
params = {"per_page": self.per_page}
return self.__request__(self.res.links[LINK_NEXT]["url"], params)
def retry(self):
pass
def has_error(self):
pass
def has_next(self):
if not LINK_NEXT in self.res.links: return False
url = self.res.links[LINK_NEXT]["url"]
page = self.__get_page__(url)
return page <= self.max_page
def last_page(self):
url = self.res.links[LINK_LAST]["url"]
return self.__get_page__(url)
def total_count(self):
return int(self.res.headers[HEADER_TOTAL])
def __get_page__(self, url):
query = urlparse(url).query
page = parse_qs(query)["page"][0]
return int(page)
def __wait__(self):
if QiitaRequest.last_request_time != None:
last = QiitaRequest.last_request_time
now = time.clock()
wait = wait_seconds - (now - last)
if 0 < wait:
time.sleep(wait)
QiitaRequest.last_request_time = time.clock()
def save_item(item):
item_id = item["id"]
filename = "data/items/%s.json" % item_id
with codecs.open(filename, "w", "utf-8") as f:
f.write(json.dumps(item, indent = 4, ensure_ascii=False))
Recommended Posts