J'ai mesuré différentes méthodes de communication inter-processus en multitraitement de python3

Si vous implémentiez la communication interprocessus dans le multiprocessus de Python L'échange d'une grande quantité de données a pris énormément de temps.

Alors, j'ai mesuré chaque méthode. La version de python est la 3.7.

Sommaire des résultats

Les résultats sont les suivants.

send time recv time Contraintes de données Gestion d'écriture séparée Remarques
Queue 0.00s 5.33s Choses qui peuvent être marinées Inutile
Pipe 3.12s 5.33s Choses qui peuvent être marinées Inutile
la memoire partagée(Array) 3.02s 2.55s Tableau à 1 dimension nécessaire Les données sont contraintes
Manager 10.19s 10.29s Tableaux, dictionnaires, etc. Inutile
RawArray 5.61s(Y compris numpy) 0.18s Tableau à 1 dimension nécessaire Code rapide mais compliqué
File 3.86s 5.26s Choses qui peuvent être marinées nécessaire Via fichier
socket 4.13s(Y compris cornichon) 5.34s(Y compris cornichon) Choses qui peuvent être marinées nécessaire Le code est compliqué.

Différence de vitesse en fonction du nombre de données

Prenons d'abord Pipe comme exemple pour voir à quel point la vitesse diffère en fonction du nombre de données.

En tant que méthode d'évaluation, le transfert de données n'est effectué qu'une seule fois et la taille de la matrice au moment du transfert est modifiée. Le processus p1 est celui qui envoie les données et ne les envoie qu'une seule fois. Le processus p2 est un processus qui continue d'attendre des données et se termine lorsqu'il est reçu.

code

import multiprocessing as mp
import time

def pipe1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.send(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))


def pipe2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if not p.poll():
            continue

        print("[{}]recv start".format(key))
        t0 = time.time()
        p.recv()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break


def main_pipe(key, size=100000000):
    reciver, sender = mp.Pipe(duplex=False)
    p1 = mp.Process(target=pipe1, args=(sender, key, size))
    p2 = mp.Process(target=pipe2, args=(reciver, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    main_pipe("pipe10", 10)
    main_pipe("pipe10^5", 100000)
    main_pipe("pipe10^6", 1000000)
    main_pipe("pipe10^7", 10000000)
    main_pipe("pipe10^8", 100000000)

Une déclaration d'assertion est également incluse pour confirmer que les données ont été reçues.

Résultat d'exécution

Temps de transmission Heure de réception
pipe10 0.0s 0.0s
pipe10^5 0.0109s 0.0039s
pipe10^6 0.0324s 0.0708s
pipe10^7 0.3240s 0.6425s
pipe10^8 3.2228s 6.4095s

Comparaison par chaque méthode de transmission de données

Le code est un extrait de la seule façon d'envoyer et de recevoir chaque donnée. La taille des données est fixée à 10 ^ 8.

Voir la fin de l'article pour le code complet.

Référence: Compréhension complète du thread Python et du multitraitement

Queue

import multiprocessing as mp

#Créer(Extrait)
q = mp.Queue()

#Envoyer(Extrait)
p.put(d)

#Recevoir(Extrait)
if not p.empty():
    d = p.get()

Résultat d'exécution

[queue]send start
[queue]send end: 0.0s
[queue]recv start
[queue]recv end: 5.337700128555298s

Pipe Pipe a une implémentation unidirectionnelle.

import multiprocessing as mp

#Créer(Extrait)
reciver, sender = mp.Pipe(duplex=False)

#Envoyer(Extrait)
sender.send(d)

#Recevoir(Extrait)
if p.poll():
    reciver = p.recv()

Résultat d'exécution

[pipe]send start
[pipe]recv start
[pipe]send end: 3.121206045150757s
[pipe]recv end: 5.337015151977539s

Mémoire partagée (baie)

Puisqu'il n'y a rien dans la mémoire partagée elle-même pour déterminer si l'écriture est terminée Une variable Value (mémoire partagée) distincte est ajoutée.

import multiprocessing as mp
import ctypes

#Créer(Extrait)
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)

#Envoyer(Extrait)
p[:] = d
flag.value = True

#Recevoir(Extrait)
if flag.value:
    d = p[:]

Résultat d'exécution

[array]send start
[array]send end: 3.0218513011932373s
[array]recv start
[array]recv end: 2.5539581775665283s

Manager

import multiprocessing as mp

#Créer(Extrait)
with mp.Manager() as manager:
    shared_arr = manager.list()
    
#Envoyer(Extrait)
shared_arr.append(d)

#Recevoir(Extrait)
if len(p) > 0:
    d = p.pop()

Résultat d'exécution

[manager]send start
[manager]send end: 10.194092750549316s
[manager]recv start
[manager]recv end: 10.295690059661865s

RawArray

Je fais référence à ce qui suit.

Identique à la mémoire partagée (Array), une variable d'indicateur pour l'achèvement de la transmission est préparée séparément. De plus, le temps requis pour la conversion numpy est mesuré séparément.

import multiprocessing.sharedctypes
import ctypes
import numpy as np

#Créer(Extrait)
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)

#Envoyer(Extrait)
d = np.asarray(d)
np.asarray(p)[:] = d
flag.value = True

#Recevoir(Extrait)
if flag.value:
    d = np.array(p, dtype=np.uint8, copy=True)

Résultat d'exécution

[raw]cast numpy time: 5.4573814868927s
[raw]send start
[raw]send end: 0.15658187866210938s
[raw]recv start
[raw]recv end: 0.18018245697021484s

File

Lors du changement de direction et de l'échange via des fichiers au lieu de la mémoire. De plus, comme pour la mémoire partagée, la fin de l'écriture est gérée par la variable flag.

import multiprocessing as mp
import pickle
import tempfile
import os

#Créer(Extrait)
with tempfile.TemporaryDirectory() as tmpdir:
    flag = mp.Value(ctypes.c_bool, False)

#Envoyer(Extrait)
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
    pickle.dump(d, f)
flag.value = True

#Recevoir(Extrait)
if flag.value:
    with open(os.path.join(p, 'testfile'), 'r+b') as f:
        d = pickle.load(f)

Résultat d'exécution

[file]send start
[file]send end: 3.8698363304138184s
[file]recv start
[file]recv end: 5.267671585083008s

socket

Il s'agit d'un échange par communication socket en changeant encore la direction. La variable flag est utilisée pour confirmer le démarrage du serveur. De plus, la conversion des cornichons est mesurée séparément.

import multiprocessing as mp
import socket
import pickle

#Créer(Extrait)
flag = mp.Value(ctypes.c_bool, False)

#Envoyer (client)(Extrait)
d = pickle.dumps(d)
if flag.value:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect(('127.0.0.1', 50007))
        s.sendall(d)

#Recevoir (serveur)(Extrait)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind(('127.0.0.1', 50007))
    s.listen(1)
    flag.value = True
    while True:
        conn, addr = s.accept()
        d = b""
        with conn:
            while True:
                #Recevoir des données
                data = conn.recv(1024*1024*1024)
                if not data:
                    break
                d += data
        d = pickle.loads(d)

Résultat d'exécution

[socket]server wait start
[socket]pickle pack time: 3.798427104949951s
[socket]send start
[socket]recv start
[socket]send end: 0.3363354206085205s
[socket]recv end: 0.5375902652740479s
[socket]pickle unpack time: 4.91701340675354s

Code entier

import multiprocessing as mp
import multiprocessing.sharedctypes
import time
import ctypes
import numpy as np
import tempfile
import pickle
import os
import socket


def queue1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.put(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def queue2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if p.empty():
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.get()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_queue(key, size=10000*10000):
    q = mp.Queue()
    p1 = mp.Process(target=queue1, args=(q, key, size))
    p2 = mp.Process(target=queue2, args=(q, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()




def pipe1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.send(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def pipe2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if not p.poll():
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.recv()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_pipe(key, size=10000*10000):
    reciver, sender = mp.Pipe(duplex=False)
    p1 = mp.Process(target=pipe1, args=(sender, key, size))
    p2 = mp.Process(target=pipe2, args=(reciver, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def array1(*args):
    p = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p[:] = d
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def array2(*args):
    p = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  #Attendez que les données changent
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p[:]
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_array(key, size=10000*10000):
    arr = mp.Array(ctypes.c_int, [0]*size )
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=array1, args=(arr, flag, key, size))
    p2 = mp.Process(target=array2, args=(arr, flag, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def manager1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]
    
    print("[{}]send start".format(key))
    t0 = time.time()
    p.append(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def manager2(*args):
    p = args[0]
    key = args[1]

    while True:
        if len(p) == 0:
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.pop()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_manager(key, size=10000*10000):
    with mp.Manager() as manager:
        shared_arr = manager.list()
        p1 = mp.Process(target=manager1, args=(shared_arr, key, size))
        p2 = mp.Process(target=manager2, args=(shared_arr, key))
        p1.start()
        p2.start()
        p1.join()
        p2.join()



def raw1(*args):
    p = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]
    
    t0 = time.time()
    d = np.asarray(d)
    print("[{}]cast numpy time: {}s".format(key, time.time()-t0))

    print("[{}]send start".format(key))
    t0 = time.time()
    np.asarray(p)[:] = d
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def raw2(*args):
    p = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  #Attendez que les données changent
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = np.array(p, dtype=np.uint8, copy=True)
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_raw(key, size=10000*10000):
    byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=raw1, args=(byte_buf, flag, key, size))
    p2 = mp.Process(target=raw2, args=(byte_buf, flag, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def file1(*args):
    tmpdir = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
        pickle.dump(d, f)
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def file2(*args):
    tmpdir = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  #Attendez que les données changent
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        with open(os.path.join(tmpdir, 'testfile'), 'rb') as f:
            d = pickle.load(f)
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        assert d[5]==5
        break

def main_file(key, size=10000*10000):
    with tempfile.TemporaryDirectory() as tmpdir:
        flag = mp.Value(ctypes.c_bool, False)
        p1 = mp.Process(target=file1, args=(tmpdir, flag, key, size))
        p2 = mp.Process(target=file2, args=(tmpdir, flag, key))
        p1.start()
        p2.start()
        p1.join()
        p2.join()



def socket1(*args):
    flag = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    t0 = time.time()
    d = pickle.dumps(d)
    print("[{}]pickle pack time: {}s".format(key, time.time()-t0))

    while True:
        if not flag.value:
            continue
        print("[{}]send start".format(key))
        t0 = time.time()
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect(('127.0.0.1', 50007))
            s.sendall(d)
        print("[{}]send end: {}s".format(key, time.time()-t0))
        break

def socket2(*args):
    flag = args[0]
    key = args[1]

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('127.0.0.1', 50007))
        s.listen(1)
        flag.value = True
        while True:
            print("[{}]server wait start".format(key))
            conn, addr = s.accept()
            print("[{}]recv start".format(key))
            t0 = time.time()

            d = b""
            with conn:
                while True:
                    #Recevoir des données
                    data = conn.recv(1024*1024*1024)
                    if not data:
                        break
                    d += data
            print("[{}]recv end: {}s".format(key, time.time()-t0))

            t0 = time.time()
            d = pickle.loads(d)
            print("[{}]pickle unpack time: {}s".format(key, time.time()-t0))

            assert d[5]==5
            break

def main_socket(key, size=10000*10000):
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=socket1, args=(flag, key, size))
    p2 = mp.Process(target=socket2, args=(flag, key,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



if __name__ == '__main__':
    main_pipe("pipe10", 10)
    main_pipe("pipe10^5", 100000)
    main_pipe("pipe10^6", 1000000)
    main_pipe("pipe10^7", 10000000)
    main_pipe("pipe10^8", 100000000)

    main_queue("queue")
    main_pipe("pipe")
    main_array("array")
    main_manager("manager")
    main_raw("raw")
    main_file("file")
    main_socket("socket")

Recommended Posts

J'ai mesuré différentes méthodes de communication inter-processus en multitraitement de python3
Résumé de diverses instructions for en Python
Résumé des méthodes intégrées, etc. de la liste Python
Types de communication inter-processus
Divers traitements de Python
Puisque le memory_profiler de python est lourd, je l'ai mesuré
L'ancien openssl pose des problèmes dans diverses parties de python
J'ai écrit python en japonais
J'ai essayé différentes méthodes pour envoyer du courrier japonais avec Python
Méthodes d'objet chaîne en Python
À propos de divers encodages de Python 3
Jugement d'équivalence d'objet en Python
Appeler dynamiquement des méthodes en Python
Je comprends Python en japonais!
Implémentation du tri rapide en Python
Ce que j'ai appris en Python
J'ai essayé d'implémenter le blackjack du jeu Trump en Python
J'ai implémenté N-Queen dans différentes langues et mesuré la vitesse
J'ai comparé le temps de calcul de la moyenne mobile écrite en Python
Ecrire diverses formes d'arbres phylogénétiques en Python à l'aide de la boîte à outils ETE
J'ai écrit le code pour écrire le code Brainf * ck en python
Différentes façons de créer un tableau de nombres de 1 à 10 en Python.
[Classification des phrases] J'ai essayé différentes méthodes de mise en commun des réseaux de neurones convolutifs
Diviser timedelta dans la série Python 2.7
Échappement automatique des paramètres MySQL en python
Gestion des fichiers JSON en Python
Implémentation du jeu de vie en Python
Résumé des différentes opérations dans Tensorflow
J'ai écrit Fizz Buzz en Python
Obtenir, publier un mémo de communication en Python
J'ai essayé d'étudier le processus avec Python
Scikit-learn ne peut pas être installé en Python
J'ai écrit la file d'attente en Python
Définir dynamiquement des fonctions (méthodes) en Python
La loi des nombres en python
Implémentation du tri original en Python
Brouillage réversible d'entiers en Python
J'ai essayé la notification de ligne en Python
J'ai essayé la communication SMTP avec Python
J'ai écrit la pile en Python
Combien de types de Python avez-vous dans votre Windows 10? J'avais 5 types.
Je veux convertir par lots le résultat de "chaîne de caractères" .split () en Python
Je veux expliquer en détail la classe abstraite (ABCmeta) de Python
Je veux colorer une partie de la chaîne Excel avec Python
J'ai essayé différents modèles de chaînes de date à saisir dans pandas.to_datetime
J'ai essayé différentes versions de l'environnement Python + OpenCV + FFmpeg sur Mac
Caractéristiques des modules d'expressions régulières qui sont souvent utilisés personnellement en Python
J'ai fait un programme pour vérifier la taille d'un fichier avec Python
J'ai mesuré la vitesse de la notation d'inclusion de liste, pendant et pendant avec python2.7.
J'ai essayé d'implémenter le jeu de cartes de Trump en Python
J'ai essayé de toucher des méthodes liées au toucher dans le module de scène de pythonista
J'ai mis Python 2.7 dans Sakura VPS 1 Go.
J'ai essayé d'implémenter PLSA en Python
Conversion de la chaîne <-> date (date, datetime) en Python
Vérifiez le comportement du destroyer en Python
J'ai essayé d'implémenter la permutation en Python
Résumé des méthodes fréquemment utilisées chez les pandas
Pratique d'utilisation de ceci en Python (mauvais)
Théorie générale de la relativité en Python: Introduction
J'ai fait un programme de gestion de la paie en Python!
Arborescence de sortie des fichiers en Python