package multiprocessing est une bibliothèque pour le traitement parallèle utilisant des processus. Ainsi, le titre «multiprocessing.pool.Pool.map» est une version de traitement parallèle de la carte.
Il sera utilisé comme suit.
from multiprocessing import Pool
import time
def iter():
for i in range(100):
print("{0} : iter {1}".format(time.time(), i))
yield i
time.sleep(2)
print("{0} : iter finished".format(time.time()))
def fun(n):
print("{0} : {1}".format(time.time(), n))
with Pool(4) as p: #Cartographie en 4 processus
p.map(fun, iter())
Alors, que se passe-t-il lorsque vous faites cela? Le résultat est le suivant, une fois que l'itérateur généré par la fonction iter a terminé toutes les itérations, la fonction est appliquée dans plusieurs processus.
$ python mp.py | sort -n
1424411166.882628 : iter 0
1424411166.882708 : iter 1
1424411166.882714 : iter 2
1424411166.882725 : iter 3
1424411166.88273 : iter 4
1424411166.882734 : iter 5
1424411166.882738 : iter 6
1424411166.882741 : iter 7
1424411166.882745 : iter 8
1424411166.882748 : iter 9
1424411166.882752 : iter 10
1424411166.882755 : iter 11
1424411166.882758 : iter 12
1424411166.882763 : iter 13
1424411166.882766 : iter 14
1424411166.88277 : iter 15
1424411166.882773 : iter 16
1424411166.882776 : iter 17
1424411166.88278 : iter 18
1424411166.882784 : iter 19
1424411168.884807 : iter finished
1424411168.890891 : 0
1424411168.891006 : 2
1424411168.891053 : 1
1424411168.891174 : 3
1424411168.891351 : 4
1424411168.891527 : 5
1424411168.891707 : 8
1424411168.89173 : 9
1424411168.89206 : 10
1424411168.892085 : 11
1424411168.892139 : 12
1424411168.892162 : 13
1424411168.892473 : 14
1424411168.892483 : 16
1424411168.892495 : 15
1424411168.892506 : 17
1424411168.892599 : 18
1424411168.892619 : 19
En regardant la mise en œuvre, c'était comme suit.
multiprocessing/pool.py
def map(self, func, iterable, chunksize=None):
'''
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.
'''
return self._map_async(func, iterable, mapstar, chunksize).get()
:
:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
'''
Helper function to implement map, starmap and their async counterparts.
'''
if self._state != RUN:
raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback,
error_callback=error_callback)
self._taskqueue.put((((result._job, i, mapper, (x,), {})
for i, x in enumerate(task_batches)), None))
return result
S'il n'y a pas de propriété len dans ʻif not hasattr (iterable, '__ len __') , il semble que
list (iterable) `convertit l'itérateur en une liste. Donc, avant d'appliquer la fonction, tous les itérateurs sont terminés.
Recommended Posts