A Python wrapper for the Qiita API for batches created in Get "almost" all posts with the Qiita API.
I didn't make it so well, so when I thought about brushing it up, I didn't do anything and left it for a month. .. .. Since the API has also been upgraded, we will only support v2 and release it for the time being.
This is a code example that retrieves up to 5 pages of 100 new posts and saves them in a file. Since the paging process is done internally, there is no need to write a double loop.
qiita2.wait_seconds = 0
for item in qiita2.items(100, 5):
print(item["title"])
qiita2.save_item(item)
Calling ʻitems ()
returns the iterator of the post's json object, so you can use it as is in a for statement or sort.
I made only what I needed, so that's it.
Set with the module variable of qiita2.
qiita2.auth_token = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
Set with the module variable of qiita2.
Variable name | Default | Contents |
---|---|---|
default_per_page | 100 | Number of acquisitions per page(Per in each API_Default when page is omitted) |
default_max_page | 100 | Maximum number of pages(Max in each API_Default when page is omitted) |
wait_seconds | 12 | Wait seconds before sending request |
retry_wait_min | 1 | Wait until retry when an error occurs(Minutes) |
retry_limit | 10 | Limit the number of retries when an error occurs |
API
Returns the iterator of the new post list.
#Get with default number of acquisitions and maximum number of pages
items = qiita2.items()
#Get by specifying the number of acquisitions and the maximum number of pages
items = qiita2.items(per_page=20, max_page=5)
Returns the iterator of the tag list.
items = qiita2.tags()
items = qiita2.tags(per_page, max_page)
Returns an iterator for a list of specific tag posts.
items = qiita2.tag_items(tag_url)
items = qiita2.tag_items(tag_url, per_page, max_page)
Get from the Total-Count
response header.
len(qiita2.items())
The file name is data / items / <post ID> .json
. ← Is the save destination fixed?
qiita2.save_item(item)
Since it is one file, you can use it as it is by copying and pasting.
qiita2.py
import time
import codecs
import json
from logging import getLogger
import requests
from urllib.parse import urlparse, parse_qs
logger = getLogger(__name__)
URL_ITEMS = "https://qiita.com/api/v2/items"
URL_TAG_ITEMS = "https://qiita.com/api/v2/tags/%s/items"
URL_TAGS = "https://qiita.com/api/v2/tags"
HEADER_TOTAL = "Total-Count"
LINK_NEXT = "next"
LINK_LAST = "last"
default_per_page = 100
default_max_page = 100
wait_seconds = 12
retry_wait_min = 1
retry_limit = 10
auth_token = None
def items(per_page = default_per_page, max_page = default_max_page):
req = QiitaRequest(URL_ITEMS, per_page, max_page)
return QiitaIterator(req)
def tag_items(tag_url, per_page = default_per_page, max_page = default_max_page):
req = QiitaRequest(URL_TAG_ITEMS % tag_url, per_page, max_page)
return QiitaIterator(req)
def tags(per_page = default_per_page, max_page = default_max_page):
req = QiitaRequest(URL_TAGS, per_page, max_page)
return QiitaIterator(req)
class QiitaIterator:
def __init__(self, req):
self.req = req
self.items = req.request().__iter__()
def __iter__(self):
return self
def __next__(self):
if self.items == None: raise StopIteration
try:
val = self.items.__next__()
return val
except StopIteration:
if self.req.has_next():
self.items = self.req.next().__iter__()
return self.__next__()
else:
raise StopIteration
def __len__(self):
return self.req.total_count()
class QiitaRequest:
last_request_time = None
retry_num = 0
def __init__(self, url, per_page = default_per_page, max_page = default_max_page, page = 1):
self.url = url
self.per_page = per_page
self.max_page = max_page
self.page = page
self.res = None
self.current_page = None
def request(self):
self.links = dict()
params = {"per_page": self.per_page, "page": self.page}
return self.__request__(self.url, params)
def __request__(self, url, params = None):
self.__wait__()
logger.info("url:%s" % url)
headers = {"Authorization": "Bearer " + auth_token} if auth_token != None else None
self.res = requests.get(url, params = params, headers = headers)
status = self.res.status_code
while status != 200 and QiitaRequest.retry_num <= retry_limit:
logger.warning("status:%d" % status)
logger.warn(u"%Wait d minutes." % retry_wait_min)
time.sleep(retry_wait_min * 60)
QiitaRequest.retry_num = QiitaRequest.retry_num + 1
self.res = requests.get(url, params = params)
status = self.res.status_code
if status != 200:
logger.warning("status:%d" % status)
logger.warning(self.res.text)
return None
QiitaRequest.retry_num = 0
return self.res.json()
def next(self):
if not self.has_next(): raise Exception()
#Per in Link response header in v2_Dealing with missing page
params = {"per_page": self.per_page}
return self.__request__(self.res.links[LINK_NEXT]["url"], params)
def retry(self):
pass
def has_error(self):
pass
def has_next(self):
if not LINK_NEXT in self.res.links: return False
url = self.res.links[LINK_NEXT]["url"]
page = self.__get_page__(url)
return page <= self.max_page
def last_page(self):
url = self.res.links[LINK_LAST]["url"]
return self.__get_page__(url)
def total_count(self):
return int(self.res.headers[HEADER_TOTAL])
def __get_page__(self, url):
query = urlparse(url).query
page = parse_qs(query)["page"][0]
return int(page)
def __wait__(self):
if QiitaRequest.last_request_time != None:
last = QiitaRequest.last_request_time
now = time.clock()
wait = wait_seconds - (now - last)
if 0 < wait:
time.sleep(wait)
QiitaRequest.last_request_time = time.clock()
def save_item(item):
item_id = item["id"]
filename = "data/items/%s.json" % item_id
with codecs.open(filename, "w", "utf-8") as f:
f.write(json.dumps(item, indent = 4, ensure_ascii=False))
Recommended Posts