Si vous implémentiez la communication interprocessus dans le multiprocessus de Python L'échange d'une grande quantité de données a pris énormément de temps.
Alors, j'ai mesuré chaque méthode. La version de python est la 3.7.
Les résultats sont les suivants.
send time | recv time | Contraintes de données | Gestion d'écriture séparée | Remarques | |
---|---|---|---|---|---|
Queue | 0.00s | 5.33s | Choses qui peuvent être marinées | Inutile | |
Pipe | 3.12s | 5.33s | Choses qui peuvent être marinées | Inutile | |
la memoire partagée(Array) | 3.02s | 2.55s | Tableau à 1 dimension | nécessaire | Les données sont contraintes |
Manager | 10.19s | 10.29s | Tableaux, dictionnaires, etc. | Inutile | |
RawArray | 5.61s(Y compris numpy) | 0.18s | Tableau à 1 dimension | nécessaire | Code rapide mais compliqué |
File | 3.86s | 5.26s | Choses qui peuvent être marinées | nécessaire | Via fichier |
socket | 4.13s(Y compris cornichon) | 5.34s(Y compris cornichon) | Choses qui peuvent être marinées | nécessaire | Le code est compliqué. |
Prenons d'abord Pipe comme exemple pour voir à quel point la vitesse diffère en fonction du nombre de données.
En tant que méthode d'évaluation, le transfert de données n'est effectué qu'une seule fois et la taille de la matrice au moment du transfert est modifiée. Le processus p1 est celui qui envoie les données et ne les envoie qu'une seule fois. Le processus p2 est un processus qui continue d'attendre des données et se termine lorsqu'il est reçu.
import multiprocessing as mp
import time
def pipe1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.send(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def pipe2(*args):
p = args[0]
key = args[1]
while True:
if not p.poll():
continue
print("[{}]recv start".format(key))
t0 = time.time()
p.recv()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_pipe(key, size=100000000):
reciver, sender = mp.Pipe(duplex=False)
p1 = mp.Process(target=pipe1, args=(sender, key, size))
p2 = mp.Process(target=pipe2, args=(reciver, key))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
main_pipe("pipe10", 10)
main_pipe("pipe10^5", 100000)
main_pipe("pipe10^6", 1000000)
main_pipe("pipe10^7", 10000000)
main_pipe("pipe10^8", 100000000)
Une déclaration d'assertion est également incluse pour confirmer que les données ont été reçues.
Temps de transmission | Heure de réception | |
---|---|---|
pipe10 | 0.0s | 0.0s |
pipe10^5 | 0.0109s | 0.0039s |
pipe10^6 | 0.0324s | 0.0708s |
pipe10^7 | 0.3240s | 0.6425s |
pipe10^8 | 3.2228s | 6.4095s |
Le code est un extrait de la seule façon d'envoyer et de recevoir chaque donnée. La taille des données est fixée à 10 ^ 8.
Voir la fin de l'article pour le code complet.
Référence: Compréhension complète du thread Python et du multitraitement
Queue
import multiprocessing as mp
#Créer(Extrait)
q = mp.Queue()
#Envoyer(Extrait)
p.put(d)
#Recevoir(Extrait)
if not p.empty():
d = p.get()
Résultat d'exécution
[queue]send start
[queue]send end: 0.0s
[queue]recv start
[queue]recv end: 5.337700128555298s
Pipe Pipe a une implémentation unidirectionnelle.
import multiprocessing as mp
#Créer(Extrait)
reciver, sender = mp.Pipe(duplex=False)
#Envoyer(Extrait)
sender.send(d)
#Recevoir(Extrait)
if p.poll():
reciver = p.recv()
Résultat d'exécution
[pipe]send start
[pipe]recv start
[pipe]send end: 3.121206045150757s
[pipe]recv end: 5.337015151977539s
Puisqu'il n'y a rien dans la mémoire partagée elle-même pour déterminer si l'écriture est terminée Une variable Value (mémoire partagée) distincte est ajoutée.
import multiprocessing as mp
import ctypes
#Créer(Extrait)
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)
#Envoyer(Extrait)
p[:] = d
flag.value = True
#Recevoir(Extrait)
if flag.value:
d = p[:]
Résultat d'exécution
[array]send start
[array]send end: 3.0218513011932373s
[array]recv start
[array]recv end: 2.5539581775665283s
Manager
import multiprocessing as mp
#Créer(Extrait)
with mp.Manager() as manager:
shared_arr = manager.list()
#Envoyer(Extrait)
shared_arr.append(d)
#Recevoir(Extrait)
if len(p) > 0:
d = p.pop()
Résultat d'exécution
[manager]send start
[manager]send end: 10.194092750549316s
[manager]recv start
[manager]recv end: 10.295690059661865s
RawArray
Je fais référence à ce qui suit.
Identique à la mémoire partagée (Array), une variable d'indicateur pour l'achèvement de la transmission est préparée séparément. De plus, le temps requis pour la conversion numpy est mesuré séparément.
import multiprocessing.sharedctypes
import ctypes
import numpy as np
#Créer(Extrait)
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)
#Envoyer(Extrait)
d = np.asarray(d)
np.asarray(p)[:] = d
flag.value = True
#Recevoir(Extrait)
if flag.value:
d = np.array(p, dtype=np.uint8, copy=True)
Résultat d'exécution
[raw]cast numpy time: 5.4573814868927s
[raw]send start
[raw]send end: 0.15658187866210938s
[raw]recv start
[raw]recv end: 0.18018245697021484s
File
Lors du changement de direction et de l'échange via des fichiers au lieu de la mémoire. De plus, comme pour la mémoire partagée, la fin de l'écriture est gérée par la variable flag.
import multiprocessing as mp
import pickle
import tempfile
import os
#Créer(Extrait)
with tempfile.TemporaryDirectory() as tmpdir:
flag = mp.Value(ctypes.c_bool, False)
#Envoyer(Extrait)
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
pickle.dump(d, f)
flag.value = True
#Recevoir(Extrait)
if flag.value:
with open(os.path.join(p, 'testfile'), 'r+b') as f:
d = pickle.load(f)
Résultat d'exécution
[file]send start
[file]send end: 3.8698363304138184s
[file]recv start
[file]recv end: 5.267671585083008s
socket
Il s'agit d'un échange par communication socket en changeant encore la direction. La variable flag est utilisée pour confirmer le démarrage du serveur. De plus, la conversion des cornichons est mesurée séparément.
import multiprocessing as mp
import socket
import pickle
#Créer(Extrait)
flag = mp.Value(ctypes.c_bool, False)
#Envoyer (client)(Extrait)
d = pickle.dumps(d)
if flag.value:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', 50007))
s.sendall(d)
#Recevoir (serveur)(Extrait)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 50007))
s.listen(1)
flag.value = True
while True:
conn, addr = s.accept()
d = b""
with conn:
while True:
#Recevoir des données
data = conn.recv(1024*1024*1024)
if not data:
break
d += data
d = pickle.loads(d)
Résultat d'exécution
[socket]server wait start
[socket]pickle pack time: 3.798427104949951s
[socket]send start
[socket]recv start
[socket]send end: 0.3363354206085205s
[socket]recv end: 0.5375902652740479s
[socket]pickle unpack time: 4.91701340675354s
import multiprocessing as mp
import multiprocessing.sharedctypes
import time
import ctypes
import numpy as np
import tempfile
import pickle
import os
import socket
def queue1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.put(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def queue2(*args):
p = args[0]
key = args[1]
while True:
if p.empty():
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.get()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_queue(key, size=10000*10000):
q = mp.Queue()
p1 = mp.Process(target=queue1, args=(q, key, size))
p2 = mp.Process(target=queue2, args=(q, key))
p1.start()
p2.start()
p1.join()
p2.join()
def pipe1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.send(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def pipe2(*args):
p = args[0]
key = args[1]
while True:
if not p.poll():
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.recv()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_pipe(key, size=10000*10000):
reciver, sender = mp.Pipe(duplex=False)
p1 = mp.Process(target=pipe1, args=(sender, key, size))
p2 = mp.Process(target=pipe2, args=(reciver, key))
p1.start()
p2.start()
p1.join()
p2.join()
def array1(*args):
p = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p[:] = d
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def array2(*args):
p = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: #Attendez que les données changent
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p[:]
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_array(key, size=10000*10000):
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=array1, args=(arr, flag, key, size))
p2 = mp.Process(target=array2, args=(arr, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def manager1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.append(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def manager2(*args):
p = args[0]
key = args[1]
while True:
if len(p) == 0:
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.pop()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_manager(key, size=10000*10000):
with mp.Manager() as manager:
shared_arr = manager.list()
p1 = mp.Process(target=manager1, args=(shared_arr, key, size))
p2 = mp.Process(target=manager2, args=(shared_arr, key))
p1.start()
p2.start()
p1.join()
p2.join()
def raw1(*args):
p = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
t0 = time.time()
d = np.asarray(d)
print("[{}]cast numpy time: {}s".format(key, time.time()-t0))
print("[{}]send start".format(key))
t0 = time.time()
np.asarray(p)[:] = d
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def raw2(*args):
p = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: #Attendez que les données changent
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = np.array(p, dtype=np.uint8, copy=True)
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_raw(key, size=10000*10000):
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=raw1, args=(byte_buf, flag, key, size))
p2 = mp.Process(target=raw2, args=(byte_buf, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def file1(*args):
tmpdir = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
pickle.dump(d, f)
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def file2(*args):
tmpdir = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: #Attendez que les données changent
continue
print("[{}]recv start".format(key))
t0 = time.time()
with open(os.path.join(tmpdir, 'testfile'), 'rb') as f:
d = pickle.load(f)
print("[{}]recv end: {}s".format(key, time.time()-t0))
assert d[5]==5
break
def main_file(key, size=10000*10000):
with tempfile.TemporaryDirectory() as tmpdir:
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=file1, args=(tmpdir, flag, key, size))
p2 = mp.Process(target=file2, args=(tmpdir, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def socket1(*args):
flag = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
t0 = time.time()
d = pickle.dumps(d)
print("[{}]pickle pack time: {}s".format(key, time.time()-t0))
while True:
if not flag.value:
continue
print("[{}]send start".format(key))
t0 = time.time()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', 50007))
s.sendall(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
break
def socket2(*args):
flag = args[0]
key = args[1]
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 50007))
s.listen(1)
flag.value = True
while True:
print("[{}]server wait start".format(key))
conn, addr = s.accept()
print("[{}]recv start".format(key))
t0 = time.time()
d = b""
with conn:
while True:
#Recevoir des données
data = conn.recv(1024*1024*1024)
if not data:
break
d += data
print("[{}]recv end: {}s".format(key, time.time()-t0))
t0 = time.time()
d = pickle.loads(d)
print("[{}]pickle unpack time: {}s".format(key, time.time()-t0))
assert d[5]==5
break
def main_socket(key, size=10000*10000):
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=socket1, args=(flag, key, size))
p2 = mp.Process(target=socket2, args=(flag, key,))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
main_pipe("pipe10", 10)
main_pipe("pipe10^5", 100000)
main_pipe("pipe10^6", 1000000)
main_pipe("pipe10^7", 10000000)
main_pipe("pipe10^8", 100000000)
main_queue("queue")
main_pipe("pipe")
main_array("array")
main_manager("manager")
main_raw("raw")
main_file("file")
main_socket("socket")
Recommended Posts