I tried to refactor the template code posted in "Getting images from Flickr API with Python" (Part 1). The process of acquiring images using the Flickr API is over. However, since it is processed sequentially, it is difficult to acquire many images using many keywords. Here, I would like to modify it to parallel processing and check how much the processing speed feels good.
from flickrapi import FlickrAPI
import requests
import os, time, sys
import configparser
import time
#Image folder path
imgdir = os.path.join(os.getcwd(), "images")
#Use the Flickr API
def request_flickr(keyword, count=100, license=None):
#Create a connected client and perform a search
config = configparser.ConfigParser()
config.read('secret.ini')
flickr = FlickrAPI(config["private"]["key"], config["private"]["secret"], format='parsed-json')
result = flickr.photos.search(
text = keyword, #Search keyword
per_page = count, #Number of acquired data
media = 'photos', #Collect photos
sort = 'relevance', #Get from the latest
safe_search = 1, #Avoid violent images
extras = 'url_l, license' #Extra information to get(URL for download, license)
)
return list(filter(lambda x : multiConditionLicenses(int(x["license"]), license), result["photos"]["photo"]))
def multiConditionLicenses(src, license=None):
dst = []
if license is None:
dst.append(lambda x : 0 <= x)
else :
license_types = license.split("|")
for t in license_types:
if t == "All_Rights_Reserved": #Copywriter
dst.append(lambda x : x == 0)
elif t == "NonCommercial": #Non-commercial
dst.append(lambda x : 1 <= x and x <= 3)
elif t == "Commercial": #Commercialization
dst.append(lambda x : 4 <= x and x <= 6)
elif t == "UnKnown": #Commercialization
dst.append(lambda x : x == 7)
elif t == "US_Government_Work": #Commercialization
dst.append(lambda x : x == 8)
elif t == "PublicDomain": #Commercialization
dst.append(lambda x : 9<= x and x <= 10)
return 0 < sum([item(src) for item in dst])
#Download from image link
def download_img(url, file_name):
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(file_name, 'wb') as f:
f.write(r.content)
if __name__ == "__main__":
#Start processing time measurement
start = time.time()
#Get query
query = None
with open("query.txt") as fin:
query = fin.readlines()
query = [ q.strip() for q in query]
#Save folder
for keyword in query:
savedir = os.path.join(imgdir, keyword)
#If not, create a folder
if not os.path.isdir(savedir):
os.mkdir(savedir)
photos = request_flickr(keyword, count=500, license="NonCommercial|Commercial")
for photo in filter(lambda p : "url_l" in p.keys(), photos):
url = photo['url_l']
filepath = os.path.join(os.path.join(imgdir, keyword), photo['id'] + '.jpg')
download_img(url, filepath)
time.sleep(1)
print('processing time', (time.time() - start), "Seconds")
--I want to process multiple keyword searches using the Flickr API in parallel. --I want to process the process of downloading from the image link in parallel.
I thought I would use concurrent.futures.ThreadPoolExecutor
for parallel processing.
The description of joblib
is simpler, so I will use it. You can write in one line in list comprehension as follows.
Parallel(n_jobs=8)([delayed({callback_func})(param1, param2, ...) for {element} in {list}])
Here, we will try to layer and parallelize two major processes: requesting multiple keywords to the flickr API and acquiring multiple image URLs from one keyword after acquiring the API response.
#Processing in the parent hierarchy
def main_process(keyword, count=100, wait_time=1):
#Retrieving and storing results
photos = request_flickr(keyword, count=count)
#Download image
#to key"url_l"Extract only those that contain(Caller of child hierarchy process)
Parallel(n_jobs=-1)([delayed(sub_process)(photo, keyword=keyword, wait_time=wait_time) for photos])
#Processing in child hierarchy
def sub_process(src, keyword, wait_time=1):
url = "https://farm{farm_id}.staticflickr.com/{server_id}/{id}_{secret}.jpg " \
.format(farm_id=src["farm"],
server_id=src["server"],
id=src["id"],
secret=src["secret"])
filepath = os.path.join(os.path.join(imgdir, keyword), src['id'] + '.jpg')
download_img(url, filepath)
time.sleep(wait_time)
if __name__ == "__main__":
...
query = ["Ikebukuro","Otsuka","Sugamo","Komagome","Tabata"]
#Request multiple keywords to flickr API(Caller of parent hierarchy process)
Parallel(n_jobs=-1)([delayed(main_process)(keyword, count=500, wait_time=1) for keyword in query])
...
The parameter of n_jobs
represents the number of processes. If it is 1, you can specify the actual sequential processing, and if it is -1, you can specify the maximum number of CPU processes to be executed.
As a keyword, I used the station name of the Yamanote line.
query.txt
Ikebukuro
Otsuka
Sugamo
Komagome
Tabata
Nishinippori
Nippori
Uguisudani
Ueno
Okachimachi
Akihabara
Kanda
Tokyo
Yurakucho
Shimbashi
Hamamatsucho
Tamachi
Shinagawa
Osaki
Gotanda
Meguro
Ebisu
Shibuya
Harajuku
Yoyogi
Shinjuku
Shin-Okubo
Takadanobaba
Mejiro
from flickrapi import FlickrAPI
from urllib.request import urlretrieve
import requests
import os, time, sys
import configparser
import time
from joblib import Parallel, delayed
#Image folder path
imgdir = os.path.join(os.getcwd(), "images")
__JOB_COUNT__ = 1
#Use the Flickr API
def request_flickr(keyword, count=100, license=None):
#Create a connected client and perform a search
config = configparser.ConfigParser()
config.read('secret.ini')
flickr = FlickrAPI(config["private"]["key"], config["private"]["secret"], format='parsed-json')
result = flickr.photos.search(
text = keyword, #Search keyword
per_page = count, #Number of acquired data
media = 'photos', #Collect photos
sort = 'relevance', #Get from the latest
safe_search = 1, #Avoid violent images
extras = 'license' #Extra information to get(URL for download, license)
)
return list(filter(lambda x : multiConditionLicenses(int(x["license"]), license), result["photos"]["photo"]))
def multiConditionLicenses(src, license=None):
dst = []
if license is None:
dst.append(lambda x : 0 <= x)
else :
license_types = license.split("|")
for t in license_types:
if t == "All_Rights_Reserved": #Copywriter
dst.append(lambda x : x == 0)
elif t == "NonCommercial": #Non-commercial
dst.append(lambda x : 1 <= x and x <= 3)
elif t == "Commercial": #Commercialization
dst.append(lambda x : 4 <= x and x <= 6)
elif t == "UnKnown": #Commercialization
dst.append(lambda x : x == 7)
elif t == "US_Government_Work": #Commercialization
dst.append(lambda x : x == 8)
elif t == "PublicDomain": #Commercialization
dst.append(lambda x : 9<= x and x <= 10)
return 0 < sum([item(src) for item in dst])
#Download from image link
def download_img(url, file_name):
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(file_name, 'wb') as f:
f.write(r.content)
else :
print("not download:{}".format(url))
#Processing in the parent hierarchy
def main_process(keyword, count=100, wait_time=1):
#Retrieving and storing results
photos = request_flickr(keyword, count=count)
#Download image
#to key"url_l"Extract only those that contain(Caller of child hierarchy process)
Parallel(n_jobs=__JOB_COUNT__)([delayed(sub_process)(photo, keyword=keyword, wait_time=wait_time) for photo in photos ])
#Processing in child hierarchy
def sub_process(src, keyword, wait_time=1):
url = "https://farm{farm_id}.staticflickr.com/{server_id}/{id}_{secret}.jpg " \
.format(farm_id=src["farm"],
server_id=src["server"],
id=src["id"],
secret=src["secret"])
filepath = os.path.join(os.path.join(imgdir, keyword), src['id'] + '.jpg')
download_img(url, filepath)
time.sleep(wait_time)
if __name__ == "__main__":
#Start processing time measurement
start = time.time()
#Get query
query = None
with open("query.txt") as fin:
query = fin.readlines()
query = [ q.strip() for q in query]
#Save folder
for keyword in query:
savedir = os.path.join(imgdir, keyword)
#If not, create a folder
if not os.path.isdir(savedir):
os.mkdir(savedir)
#Request multiple keywords to flickr API(Caller of parent hierarchy process)
Parallel(n_jobs=__JOB_COUNT__)([delayed(main_process)(keyword, count=10, wait_time=1) for keyword in query])
print('Parallel processing', (time.time() - start), "Seconds")
The difference from the last time is the link of https://farm{farm-id}.staticflickr.com/{server-id}/{id}_{secret}.jpg
to get the image more surely. I am using. (See Flickr API: Photo Source URLs)
This time, the license parameter is not assigned because the purpose is the download processing speed. count is set to 10
. You are now getting 290
images. The sleep time after downloading from each image URL is set to 0.5
seconds.
So, I measured how fast the processing speed would be when the number of processes was 1,2,4,8,16,24,32, max (-1)
.
Number of processes | processing time(sec) |
---|---|
1 | 360.21357011795044 |
2 | 83.60558104515076 |
4 | 27.984444856643677 |
8 | 11.372981071472168 |
16 | 8.048759937286377 |
24 | 11.179131984710693 |
32 | 11.573050022125244 |
max (n_jobs=-1) | 25.939302921295166 |
なまデータ
速度を対数にした時
Processing is completed 40 to 50 times faster than sequential processing. : scream_cat:
Although it is a parameter of n_jobs = -1
, it is faster to enter a fixed value 16
even though the maximum value is set. In the execution environment, ʻimport os os.cpu_count () = 4 `, so it probably depends on the number of processes in the cpu.
As an aside, the flickr API has a limit of 3600 data / hour. However, it seems that it can be used for a lot of loop processing.
Fluent Python Chapter 17 has a sample of downloading national flags in parallel, but this time the flickr API is more practical. It is also a good point to be a good sample subject when using Future
or when you want to perform more optimized parallel processing. : curry:
--joblib.Parallel (official doc) -I thoroughly investigated the parallel processing and parallel processing of Python -About parallel computing with Joblib in Python -Python parallel processing (multiprocessing and Joblib) -Fluent Python Chapter 17 Concurrency with futures (Python 3.7 version) -[Parallel processing with python joblib](http://data-analysis-stats.jp/2019/10/24/python%E3%81%AEjoblib%E3%81%A7%E4%B8%A6%E5% 88% 97% E5% 87% A6% E7% 90% 86 /)
Recommended Posts