Traitement parallèle Python (multitraitement et Joblib)

introduction

Lors d'une recherche avec un traitement parallèle Python, la première chose qui apparaît est multiprocessing ou Joblib est.

Il existe divers articles de commentaires sur les deux, mais Joblib est meilleur que le multitraitement.

Puisqu'il existe de nombreuses fonctions utiles telles que, j'ai l'habitude de l'utiliser. Cependant, j'ai eu l'occasion d'utiliser le multitraitement pour une raison quelconque, et quand j'ai comparé les vitesses des deux, je n'ai pas compris la raison, mais la parallélisation avec le multitraitement était plus rapide, je vais donc laisser un mémorandum.

Environnement d'exécution

Joblib

L'installation peut être effectuée avec pip.

Pour montrer brièvement comment l'utiliser,

def process(i):
	return [{'id': j, 'sum': sum(range(i*j))} for j in range(100)]

S'il y a une fonction comme

from joblib import Parallel, delayed

result = Parallel(n_jobs=-1)([delayed(process)(n) for n in range(1000)])

Vous pouvez exécuter la fonction process en parallèle en écrivant le processus de parallélisation comme ceci. Vous pouvez changer le nombre de cœurs à utiliser en changeant le contenu de n_jobs. Il sera exécuté avec le nombre maximum de cœurs pouvant être utilisé avec -1, et s'il vaut 1, ce sera la même situation qu'une exécution simple. C'est facile et bon.

Même si vous spécifiez plus que le nombre de cœurs disponibles dans n_jobs, il ne tombera pas, et il semble que le traitement sera distribué aux cœurs disponibles comme il convient.

En plus de n_jobs, vous pouvez prendre un argument détaillé, et si vous spécifiez une valeur de 0 à 10, la progression sera sortie en fonction de la fréquence spécifiée.

multiprocessing

Étant donné que le multitraitement est une bibliothèque Python standard, il peut être utilisé sans aucune installation particulière. Il existe de nombreuses fonctions, mais la manière la plus simple de les utiliser est

from multiprocessing import Pool
import multiprocessing as multi

p = Pool(multi.cpu_count())
p.map(process, list(range(1000)))
p.close()

On dirait. Spécifiez le nombre de parallèles avec Pool, et exécutez la fonction en parallèle avec map. Vous pouvez obtenir le nombre maximum de cœurs exécutables avec multi.cpu_count (). Veuillez noter que si vous ne terminez pas le Pool créé avec close (), cela consommera de la mémoire et ce sera un gros problème.

Différence d'utilisabilité

Le plus grand avantage de joblib par rapport au multitraitement est que vous pouvez prendre autre chose qu'un tableau comme argument de la fonction à paralléliser.

Par exemple

def process(i, s):
    return [{'id': i, 'str': s, 'sum': sum(range(i*j))} for j in range(100)]

En supposant qu'il existe une fonction qui prend plusieurs arguments comme Dans joblib

strs = ['a', 'b', 'c']
result = Parallel(n_jobs=job)([delayed(process)(i,s) for i,s in enumerate(strs * 1000)])

Vous pouvez le faire comme ça, mais même si vous essayez de l'exécuter avec le multitraitement de la même manière,

p = Pool(multi.cpu_count())
strs = ['a', 'b', 'c']
p.map(process, enumerate(strs * 1000))
p.close()

Je me fâche contre l'argument et j'obtiens une erreur.

TypeError: process() missing 1 required positional argument:

Dans ce cas, la fonction d'exécution

def process(v):
    return [{'id': v[0], 'str': v[1], 'sum': sum(range(v[0]*j))} for j in range(100)]

Il est nécessaire de changer pour qu'un tableau soit pris comme argument.

Comparaison de vitesse

Maintenant, comparons la vitesse du sujet principal. Pour la fonction d'exécution

def process(n):
    return sum([i*n for i in range(100000)])

Essayez une exécution parallèle pour chacun et mesurez la vitesse.

def usejoblib(job, num):
    Parallel(n_jobs=job)([delayed(process)(n) for n in range(num)])


def usemulti(job, num):
    p = Pool(multi.cpu_count() if job < 0 else job)
    p.map(process, list(range(num)))
    p.close()

Voici le résultat de la mesure 10 fois avec un nombre différent de boucles et de la moyenne de chacune (8 tâches, l'unité est la seconde)

loop_n normal Joblib multiprocessing
10 0.0441 0.113 0.0217
100 0.414 0.211 0.139
1000 4.16 1.32 1.238
10000 41.1 12.5 12.2
100000 430 123 119

Les deux sont beaucoup plus rapides que sans parallélisation. Il semble qu'il n'y ait pas beaucoup de différence de vitesse entre les deux (la version multitraitement est-elle légèrement plus rapide?).

Si vous essayez d'augmenter la quantité de traitement dans une boucle

def process(i):
    return [{'id': j, 'sum': sum(range(i*j))} for j in range(1000)]
loop_n normal Joblib multiprocessing
10 0.25 0.21 0.07
100 28.4 13.2 7.53
1000 - 737 701

Encore une fois, le multitraitement était un peu plus rapide, mais je ne savais pas pourquoi. S'il y a une telle différence, le résultat peut être inversé en fonction de l'environnement d'exécution et de la charge de traitement de la fonction, mais dans tous les cas, ce sera beaucoup plus rapide que le traitement en boucle normal, alors soyez agressif lorsque vous effectuez un traitement en boucle avec Python. Je veux l'utiliser comme cible.

Enfin, je mettrai le code exécuté cette fois.

import time
from joblib import Parallel, delayed
from multiprocessing import Pool
import multiprocessing as multi
from more_itertools import flatten
import sys
import functools


def process(i):
    return [{'id': j, 'sum': sum(range(i*j))} for j in range(1000)]


#def process(n):
#    return sum([i*n for i in range(100000)])


def usejoblib(job, num):
    result =Parallel(n_jobs=job)([delayed(process)(n) for n in range(num)])
    return result


def usemulti(job, num):
    p = Pool(multi.cpu_count() if job < 0 else job)
    result = p.map(process, range(num))
    p.close()
    return result

if __name__ == '__main__':
    argv = sys.argv
    total = 0
    n = 1
    
    for i in range(n):
        s = time.time()
        if argv[1] == 'joblib':
            result = usejoblib(int(argv[2]),int(argv[3]))
        elif argv[1] == 'multi':
            result = usemulti(int(argv[2]),int(argv[3]))
        else:
            result = [process(j) for j in range(int(argv[3]))]
        elapsed = time.time()-s
        print('time: {0} [sec]'.format(elapsed))
        total += elapsed
        
    print('--------')
    print('average: {0} [sec]'.format(total/n))

    sums = functools.reduce(lambda x, y: x + y['sum'], list(flatten(result)), 0)
    print('total: {0}'.format(sums))
#    print('total: {0}'.format(sum(result)))

Recommended Posts

Traitement parallèle Python (multitraitement et Joblib)
[Python] Traitement parallèle facile avec Joblib
Traitement parallèle avec multitraitement
Résumé de l'exemple de code de traitement parallèle / parallèle Python
mappe de traitement de chaîne python et lambda
Communication socket et traitement multi-thread par Python
Compréhension complète du threading Python et du multitraitement
traitement d'image python
Traitement de fichiers Python
Partage de données de type liste entre processus par traitement parallèle à l'aide du multitraitement Python
Comment faire un traitement parallèle multicœur avec python
Traitement Y / n avec bash, Python et Go
Traitement parallèle sans signification profonde en Python
Comment prendre plusieurs arguments lors d'un traitement parallèle à l'aide du multitraitement en python
[python] Compresser et décompresser
Astuces Python et Numpy
[Python] pip et roue
Traitement pleine largeur et demi-largeur des données CSV en Python
Traitement de fichiers en Python
Python: traitement du langage naturel
Traitement de la communication par Python
Traitement multithread en python
Itérateur et générateur Python
Remarques sur le traitement d'images HDR et RAW avec Python
[Python] Mesure et affiche le temps nécessaire au traitement
Premier traitement d'image Python
Paquets et modules Python
Intégration Vue-Cli et Python
Téléchargement parallèle avec Python
Ruby, Python et carte
Introduction au traitement parallèle distribué Python par Ray
Utilisez SQL Alchemy et le multitraitement
Traitement de texte avec Python
Traitement des requêtes en Python
entrée et sortie python
Python et Ruby se séparent
Traitement d'image avec Python
Traitement asynchrone de Python ~ Comprenez parfaitement async et attendez ~
Traitement parallèle prenant en charge plusieurs arguments et le nombre de processus spécifiés par le multitraitement [Enregistrer PDF]
Python asyncio et ContextVar
Illustration de traitement de chaîne Python
Récapitulatif du traitement de la date en Python (datetime et dateutil)
Divers traitements de Python
Le traitement parallèle de Python joblib ne fonctionne pas dans l'environnement uWSGI. Comment traiter en parallèle sur uWSGI?
[Jouons avec Python] Traitement d'image en monochrome et points
Traitement d'image avec Python (partie 2)
Programmation avec Python et Tkinter
Chiffrement et déchiffrement avec Python
python3 Mesurez la vitesse de traitement.
Python: variables de classe et d'instance
Traitement parallèle avec des fonctions locales
3-3, chaîne Python et code de caractère
Série Python 2 et série 3 (édition Anaconda)
"Traitement Apple" avec OpenCV3 + Python3
Python et matériel - Utilisation de RS232C avec Python -
Python sur Ruby et Ruby en colère sur Python
Indentation Python et format de chaîne
division des nombres réels python (/) et division des nombres entiers (//)
Installez Python et Flask (Windows 10)