When you search by Python parallel processing, the first thing that comes up is multiprocessing or Joblib. is.
There are various commentary articles on both, but Joblib is better than multiprocessing.
--The function to be parallelized can take a form other than an array as an argument. --Child process will be terminated when it is terminated by Ctrl + c. --There is an option to automatically display the overall progress
Since there are many useful functions such as, I usually used that. However, I had an opportunity to use multiprocessing for some reason, and when I compared the speeds of the two, I did not understand the reason, but parallelization with multiprocessing was faster, so I will leave a memorandum.
Joblib
Installation can be done with pip.
To briefly show how to use it,
def process(i):
return [{'id': j, 'sum': sum(range(i*j))} for j in range(100)]
If there is a function like
from joblib import Parallel, delayed
result = Parallel(n_jobs=-1)([delayed(process)(n) for n in range(1000)])
You can execute the process
function in parallel by writing parallel processing like this. You can change the number of cores to use by changing the contents of n_jobs
. -1 will execute with the maximum number of cores that can be used, and 1 will be the same situation as single execution. It's easy and good.
Even if you specify a number larger than the number of available cores in n_jobs, it will not drop, and it seems that the processing will be distributed to the available cores as appropriate.
In addition to n_jobs, you can take an argument verbose, and if you specify a value from 0 to 10, the progress will be output according to the specified frequency.
multiprocessing
Since multiprocessing is a standard Python library, you can use it without installing it. There are many functions, but the easiest way to use it is
from multiprocessing import Pool
import multiprocessing as multi
p = Pool(multi.cpu_count())
p.map(process, list(range(1000)))
p.close()
It looks like. Specify the number of parallels with Pool
, and execute the function in parallel with map
. You can get the maximum number of executable cores with multi.cpu_count ()
. Please note that if you do not close the created Pool with close ()
, it will eat up memory and it will be a big deal.
The biggest advantage of joblib over multiprocessing is that you can take non-array arguments for parallelizing functions.
For example
def process(i, s):
return [{'id': i, 'str': s, 'sum': sum(range(i*j))} for j in range(100)]
Assuming that there is a function that takes multiple arguments like In joblib
strs = ['a', 'b', 'c']
result = Parallel(n_jobs=job)([delayed(process)(i,s) for i,s in enumerate(strs * 1000)])
You can run it like this, but even if you try to run it with multiprocessing in the same way,
p = Pool(multi.cpu_count())
strs = ['a', 'b', 'c']
p.map(process, enumerate(strs * 1000))
p.close()
I get angry at the argument and get an error.
TypeError: process() missing 1 required positional argument:
In this case, the execution function
def process(v):
return [{'id': v[0], 'str': v[1], 'sum': sum(range(v[0]*j))} for j in range(100)]
It is necessary to change so that one array is taken as an argument.
Now, regarding the speed comparison of the main subject, For the execution function
def process(n):
return sum([i*n for i in range(100000)])
Try parallel execution for each and measure the speed.
def usejoblib(job, num):
Parallel(n_jobs=job)([delayed(process)(n) for n in range(num)])
def usemulti(job, num):
p = Pool(multi.cpu_count() if job < 0 else job)
p.map(process, list(range(num)))
p.close()
Here is the result of measuring 10 times with different number of loops and averaging each (8 jobs, unit is sec)
loop_n | normal | Joblib | multiprocessing |
---|---|---|---|
10 | 0.0441 | 0.113 | 0.0217 |
100 | 0.414 | 0.211 | 0.139 |
1000 | 4.16 | 1.32 | 1.238 |
10000 | 41.1 | 12.5 | 12.2 |
100000 | 430 | 123 | 119 |
Both are much faster than without parallelization. It seems that there is not much difference in speed between the two (is the multiprocessing version slightly faster?).
If you try to increase the processing amount in one loop
def process(i):
return [{'id': j, 'sum': sum(range(i*j))} for j in range(1000)]
loop_n | normal | Joblib | multiprocessing |
---|---|---|---|
10 | 0.25 | 0.21 | 0.07 |
100 | 28.4 | 13.2 | 7.53 |
1000 | - | 737 | 701 |
Again, multiprocessing was a little faster, but I didn't know why. If there is such a difference, the result may be reversed depending on the execution environment and the processing load of the function, but in any case, it will be much faster than normal loop processing, so be aggressive when performing loop processing with Python. I want to use it as a target.
Finally, I will put the code executed this time.
import time
from joblib import Parallel, delayed
from multiprocessing import Pool
import multiprocessing as multi
from more_itertools import flatten
import sys
import functools
def process(i):
return [{'id': j, 'sum': sum(range(i*j))} for j in range(1000)]
#def process(n):
# return sum([i*n for i in range(100000)])
def usejoblib(job, num):
result =Parallel(n_jobs=job)([delayed(process)(n) for n in range(num)])
return result
def usemulti(job, num):
p = Pool(multi.cpu_count() if job < 0 else job)
result = p.map(process, range(num))
p.close()
return result
if __name__ == '__main__':
argv = sys.argv
total = 0
n = 1
for i in range(n):
s = time.time()
if argv[1] == 'joblib':
result = usejoblib(int(argv[2]),int(argv[3]))
elif argv[1] == 'multi':
result = usemulti(int(argv[2]),int(argv[3]))
else:
result = [process(j) for j in range(int(argv[3]))]
elapsed = time.time()-s
print('time: {0} [sec]'.format(elapsed))
total += elapsed
print('--------')
print('average: {0} [sec]'.format(total/n))
sums = functools.reduce(lambda x, y: x + y['sum'], list(flatten(result)), 0)
print('total: {0}'.format(sums))
# print('total: {0}'.format(sum(result)))
Recommended Posts