multiprocessing package is a library for performing parallel processing using processes. And the title multiprocessing.pool.Pool.map
is a concurrency version of map.
It will be used as follows.
from multiprocessing import Pool
import time
def iter():
for i in range(100):
print("{0} : iter {1}".format(time.time(), i))
yield i
time.sleep(2)
print("{0} : iter finished".format(time.time()))
def fun(n):
print("{0} : {1}".format(time.time(), n))
with Pool(4) as p: #Map in 4 processes
p.map(fun, iter())
So what happens when you do this? The result is as follows, after the iterator generated by the iter function finishes all iterations, the function is applied in multiple processes.
$ python mp.py | sort -n
1424411166.882628 : iter 0
1424411166.882708 : iter 1
1424411166.882714 : iter 2
1424411166.882725 : iter 3
1424411166.88273 : iter 4
1424411166.882734 : iter 5
1424411166.882738 : iter 6
1424411166.882741 : iter 7
1424411166.882745 : iter 8
1424411166.882748 : iter 9
1424411166.882752 : iter 10
1424411166.882755 : iter 11
1424411166.882758 : iter 12
1424411166.882763 : iter 13
1424411166.882766 : iter 14
1424411166.88277 : iter 15
1424411166.882773 : iter 16
1424411166.882776 : iter 17
1424411166.88278 : iter 18
1424411166.882784 : iter 19
1424411168.884807 : iter finished
1424411168.890891 : 0
1424411168.891006 : 2
1424411168.891053 : 1
1424411168.891174 : 3
1424411168.891351 : 4
1424411168.891527 : 5
1424411168.891707 : 8
1424411168.89173 : 9
1424411168.89206 : 10
1424411168.892085 : 11
1424411168.892139 : 12
1424411168.892162 : 13
1424411168.892473 : 14
1424411168.892483 : 16
1424411168.892495 : 15
1424411168.892506 : 17
1424411168.892599 : 18
1424411168.892619 : 19
Looking at the implementation, it was as follows.
multiprocessing/pool.py
def map(self, func, iterable, chunksize=None):
'''
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.
'''
return self._map_async(func, iterable, mapstar, chunksize).get()
:
:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
'''
Helper function to implement map, starmap and their async counterparts.
'''
if self._state != RUN:
raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback,
error_callback=error_callback)
self._taskqueue.put((((result._job, i, mapper, (x,), {})
for i, x in enumerate(task_batches)), None))
return result
If you don't have the len property in ʻif not hasattr (iterable,'len'), it looks like you're converting the iterator to a list with
list (iterable)`. So, before applying the function, all the iterators are finished, aren't they?
Recommended Posts