J'ai utilisé un objet Manager pour la gestion de l'état entre les processus.
#Afficher la liste toutes les 3 secondes def list_print(test_list): while True: print(str(test_list)) time.sleep(3)
#Ajouter un à la liste toutes les 2 secondes def list_append(test_list): while True: test_list.append("a") time.sleep(2)
if name == 'main': manager = Manager() test_list=manager.list() print("Liste avant l'exécution de la fonction" +str(test_list)) p1=Process(target=list_print,args=(test_list)) p2=Process(target=list_append,args=(test_list)) p1.start() p2.start() p1.join() p2.join()
<h2> 4. Résultat de l'exécution </ h2>
Traceback (most recent call last): Traceback (most recent call last): File "/Applications/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap self.run() File "/Applications/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run self._target(*self._args, **self._kwargs) File "/Applications/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap self.run() File "/Applications/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: list_append() missing 1 required positional argument: 'test_list' TypeError: list_print() missing 1 required positional argument: 'test_list'
Apparemment, la manière de prendre l'argument était mauvaise et une erreur s'est produite.
Je me suis demandé si l'objet Manager ne prend pas en charge les types de liste, mais il dit officiellement qu'il prend en charge les listes.
Alors après avoir lu la référence pendant un moment, j'ai trouvé une solution.
<h2> 5. Solution </ h2>
Il a été résolu en définissant un dictionnaire comme premier argument de la fonction à exécuter.
Il définit un dummy dictionnaire vide et le prend comme argument.
Le code est ci-dessous.
```python
from multiprocessing import Process,Manager
import time
#Afficher la liste
def list_print(dummy, test_list):
while True:
print(str(test_list))
time.sleep(3)
#Ajouter un à la liste
def list_append(dummy, test_list):
while True:
test_list.append("a")
time.sleep(2)
if __name__ == '__main__':
manager = Manager()
#Définir un dictionnaire vide
dummy = manager.dict()
test_list=manager.list()
print("Liste avant l'exécution de la fonction" +str(test_list))
#Ajouter un dictionnaire vide au premier argument
p1=Process(target=list_print,args=(dummy, test_list))
p2=Process(target=list_append,args=(dummy, test_list))
p1.start()
p2.start()
p1.join()
p2.join()
Liste avant l'exécution de la fonction[]
[]
['a', 'a']
['a', 'a', 'a']
['a', 'a', 'a', 'a', 'a']
c'est tout.
Recommended Posts