Future
Lorsque vous effectuez un calcul chronophage en Python et que vous souhaitez obtenir le résultat, vous placez généralement le processus dans une fonction et l'obtenez comme valeur de retour lorsque vous exécutez la fonction. C'est ce qu'on appelle le traitement synchrone.
D'autre part, le traitement asynchrone est un concept différent du traitement synchrone. Cela implique l'interaction suivante entre le processus qui demande le calcul (récepteur) et le processus qui effectue réellement le calcul (émetteur) via un objet appelé «Future».
Future
.Future
, et si le résultat du calcul est enregistré, obtenez le résultat du calcul.Le traitement jusqu'à ce point est par exemple le suivant.
import asyncio
import time
def f(future):
time.sleep(5) #Processus chronophage
future.set_result("hello")
return
future = asyncio.futures.Future()
f(future)
if future.done():
res = future.result()
print(res)
Quand je fais cela, il dit "bonjour" après avoir attendu 5 secondes.
Lib/asyncio/futures.py
class Future:
_state = _PENDING
_result = None
def done(self):
return self._state != _PENDING
def result(self):
if self._state != _FINISHED:
raise exceptions.InvalidStateError('Result is not ready.')
return self._result
def set_result(self, result):
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
Comme vous l'avez déjà remarqué, le code ci-dessus est le même qu'un appel de fonction normal, sauf qu'il utilise un objet Future
. En effet, le destinataire exécute directement le code de l'expéditeur. Cela ne profite pas de «Future».
C'est là que le concept de boucles d'événements entre en jeu. Une boucle d'événements est un objet qui a 0 ou 1 objet par thread et qui a pour fonction d'exécuter des fonctions enregistrées.
Utilisons-le réellement.
import asyncio
import time
def f(future):
time.sleep(5) #Processus chronophage
future.set_result("hello")
return
loop = asyncio.get_event_loop()
future = asyncio.futures.Future()
loop.call_soon(f, future)
loop.run_forever()
Dans le code ci-dessus, nous appelons ʻasyncio.get_event_loop pour obtenir l'objet
BaseEventLoop. Ensuite, la fonction
f est enregistrée dans
loop par
call_soon. Enfin, la boucle d'événement est exécutée avec
loop.run_forever ()`.
Quand je fais ça, je suis dans une boucle infinie avec run_forever ()
et le programme ne se termine jamais. Au lieu de cela, vous pouvez arrêter automatiquement la boucle d'événements après que la fonction f ()
ait fini de s'exécuter en écrivant:
res = loop.run_until_complete(future)
print(res)
Comment run_until_complete ()
peut-il connaître l'achèvement de la fonctionf ()
? Cela utilise un mécanisme appelé le callback future
.
Dans run_until_complete ()
, la fonction future.add_done_callback ()
est d'abord exécutée, et le rappel est défini dans future
. Ensuite, run_forever ()
est appelé et la fonction f ()
est exécutée. Ensuite, lorsque la valeur est définie par future.set_result ()
dans la fonction f ()
, le rappel défini par ʻadd_done_callback () est appelé. Dans le callback défini par
run_until_complete (),
loop.stop ()est utilisé pour réserver la fin de la boucle d'événements, donc la boucle d'événements s'arrêtera après la fin de l'exécution de
f ()`. ..
Notez que future.set_result ()
n'est pas exécutée et que la fonction f ()
n'est pas immédiatement terminée. La fin est uniquement réservée et l'exécution se poursuit en fait jusqu'à «return».
Lib/asyncio/events.py
import contextvars
class Handle:
def __init__(self, callback, args, loop):
self._context = contextvars.copy_context()
self._loop = loop
self._callback = callback
self._args = args
def _run(self):
self._context.run(self._callback, *self._args)
Lib/asyncio/base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._stopping = False
self._ready = collections.deque()
def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
self._ready.append(handle)
return handle
def _run_once(self):
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
handle._run()
def run_forever(self):
while True:
self._run_once()
if self._stopping:
break
def run_until_complete(self, future):
def _run_until_complete_cb(fut):
self.stop()
future.add_done_callback(_run_until_complete_cb)
self.run_forever()
return future.result()
def stop(self):
self._stopping = True
Lib/asyncio/futures.py
class Future:
def add_done_callback(self, fn):
context = contextvars.copy_context()
self._callbacks.append((fn, context))
def set_result(self, result):
# ...réduction
for callback, ctx in self._callbacks[:]:
self._loop.call_soon(callback, self, context=ctx)
Dans le chapitre précédent, le traitement était exécuté à l'aide d'une boucle d'événements. Cependant, la seule chose qui a changé est que la fonction f
, qui effectue un traitement fastidieux, n'a pas été exécutée directement, mais a été exécutée via une boucle d'événements. Cela ne change pas ce que vous faites.
La vraie nature de la boucle d'événements entre en jeu lorsque plusieurs processus sont exécutés. Faisons-le réellement.
import asyncio
import time
def f(future, tag):
for _ in range(3):
time.sleep(1)
print("waiting for f(%d)" % tag)
future.set_result("hello %d" % tag)
return
loop = asyncio.get_event_loop()
futures = []
for tag in range(3):
future = loop.create_future()
loop.call_soon(f, future, tag)
futures += [future]
res = loop.run_until_complete(asyncio.gather(*futures))
print(res)
Ce code enregistre trois processus. Il utilise également une nouvelle fonction appelée ʻasyncio.gatherpour regrouper plusieurs
Future`s en un seul. Le résultat de cette exécution est le suivant.
waiting for f(0)
waiting for f(0)
waiting for f(0)
waiting for f(1)
waiting for f(1)
waiting for f(1)
waiting for f(2)
waiting for f(2)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']
Notez que comme vous pouvez le voir à partir de ce résultat, f (0)
, f (1)
, f (2)
ne fonctionnent pas en parallèle. Comme vous pouvez le voir dans le code source de la bibliothèque, dans loop.run_until_complete ()
, les callbacks enregistrés dans loop._ready
ne sont exécutés que séquentiellement.
Lib/asyncio/tasks.py
class _GatheringFuture(futures.Future):
def __init__(self, children, *, loop=None):
super().__init__(loop=loop)
self._children = children
self._cancel_requested = False
def gather(*coros_or_futures, loop=None, return_exceptions=False):
def _done_callback(fut):
nonlocal nfinished
nfinished += 1
if nfinished == nfuts:
results = []
for fut in children:
res = fut.result()
results.append(res)
outer.set_result(results)
arg_to_fut = {}
children = []
nfuts = 0
nfinished = 0
for arg in coros_or_futures:
nfuts += 1
fut.add_done_callback(_done_callback)
children.append(fut)
outer = _GatheringFuture(children, loop=loop)
return outer
Maintenant, déraillons et voyons le générateur Python. Un générateur est une "fonction qui renvoie un itérateur". L'exécution du générateur renvoie un objet générateur. L'objet générateur implémente la fonction «iter ()» qui représente un itérateur. Le générateur est mis en œuvre comme suit.
def generator():
yield 1
yield 2
yield 3
return "END"
gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
print(gg.__next__())
except StopIteration as e:
print(e.value)
Ici, «yield» a pour fonction d'arrêter temporairement le traitement du contenu du générateur. Les générateurs peuvent également être empilés en deux couches.
def generator2():
yield 1
yield 2
yield 3
return "END"
def generator():
a = yield from generator2()
return a
gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
print(gg.__next__())
except StopIteration as e:
print(e.value)
Les deux résultats d'exécution
1
2
3
END
Ce sera.
Comme mentionné dans le chapitre précédent, lors de l'exécution de plusieurs fonctions à l'aide de loop.run_until_complete
, la deuxième fonction est exécutée après l'exécution de la première fonction, et ainsi de suite. Les fonctions ne sont pas exécutées en parallèle, mais en séquence. Ici, si vous utilisez un générateur au lieu d'une fonction, ce sera comme suit.
import asyncio
import time
def f(tag):
for _ in range(3):
yield
time.sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = []
for tag in range(3):
task = f(tag)
tasks += [task]
res = loop.run_until_complete(asyncio.gather(*tasks))
print(res)
Ici, j'ai ajouté une instruction yield
dans la fonction f ()
et renvoyé le résultat du calcul comme return
au lieu de future.set_result
. L'argument «futur» n'est plus nécessaire et a été supprimé.
Le résultat de cette exécution est le suivant.
waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']
Dans le chapitre précédent, «f (0)» était affiché trois fois, puis «f (1)» était affiché, et ... a été changé en «f (0)», «f (1)». , f (2)
sont maintenant affichés dans cet ordre. En effet, même si plusieurs tâches sont enregistrées dans la boucle d'événements, elles sont toutes exécutées dans un thread. De plus, la boucle d'événements ne peut pas interrompre l'exécution d'une fonction Python, elle doit donc continuer à exécuter une fonction jusqu'à ce qu'elle s'arrête volontairement, comme par return
.
D'un autre côté, si vous utilisez un générateur, yield
suspendra l'exécution de la fonction. Puisque le traitement revient du côté de la boucle d'événements à ce moment, il est possible de commuter la tâche exécutée par la boucle d'événements.
À propos, le plus petit exemple d'utilisation d'un générateur est: (Cela n'a aucun sens d'en faire un générateur car il n'y a qu'une seule tâche ...)
import asyncio
import time
def f():
time.sleep(5) #Processus chronophage
yield
return "hello"
loop = asyncio.get_event_loop()
ret = loop.run_until_complete(f())
print(ret)
D'ailleurs, dans la version qui n'utilise pas le générateur, loop.call_soon ()
a été appelé et la fonctionf ()
a été enregistrée dans la boucle d'événements, mais ceux qui doutaient que cela n'ait pas été appelé dans ce chapitre. Je pense qu'il y en a beaucoup. Plus précisément, c'est comme suit.
Nom de la fonction | argument(Version future) | argument(Version du générateur) |
---|---|---|
f() |
future |
Aucun |
loop.call_soon() |
f |
-- |
loop.run_until_complete() |
future |
f |
Dans run_until_complete ()
, si l'argument donné est un objet générateur (obtenu en appelant la fonctionf ()
définie comme générateur), alors une instance Task
(une sous-classe de Future
) Générer. Call_soon ()
est appelé en interne au moment de cette génération.
<détails> Dans les exemples jusqu'à présent, «time.sleep ()» était fortement utilisé. Ceci est, bien sûr, pour illustrer le "traitement chronophage", mais vous voudrez peut-être réellement --Traitement de temporisation qui s'annule après avoir attendu un certain temps en tant que sous-traitement tout en effectuant une communication réseau dans le traitement principal
--Afficher la progression dans le sous-processus tout en effectuant des calculs chronophages dans le processus principal Cependant, dans un tel cas, Je souhaite attendre un certain temps dans une tâche, mais je souhaite renvoyer le processus à la boucle d'événements pendant le temps d'attente. Dans de tels cas, vous pouvez utiliser la fonction Ceci est une réécriture du processus dans le chapitre précédent en utilisant Cela peut être un peu plus compliqué. Par exemple, supposons que vous appelez une fonction dans une tâche et que cette fonction essaie de Vous avez peut-être remarqué que dans le code Pour des raisons techniques, l'instance Autrement dit, l'itération de L'expression sur le côté droit de ʻit_inner = future .__ iter__ () ʻest exécuté. Une autre raison politique est que nous voulions être capables de gérer le générateur (ou collout) et «Future» dans la même ligne. Ceci est également lié à ʻawait` dans le chapitre suivant. <détails> Le même code que dans le chapitre précédent peut être écrit comme suit en Python 3.7 ou version ultérieure.
(À proprement parler, il y a une légère différence entre le fait que celui utilisé dans ce chapitre est «coroutine» et celui utilisé dans le chapitre précédent est «générateur».) Dans ce format, vous pouvez utiliser ʻasyncio.sleep () De plus, si vous n'avez qu'une seule tâche, vous pouvez l'écrire encore plus facilement en utilisant ʻasyncio.run () `. <détails> Je rassemblais de tels articles dans le brouillon, mais comme d'autres personnes avaient publié des articles similaires, j'ai également décidé de publier (?) À la hâte.Lib/asyncio/base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def run_until_complete(self, future):
future = tasks.ensure_future(future, loop=self)
future.add_done_callback(_run_until_complete_cb)
self.run_forever()
return future.result()
Lib/asyncio/tasks.py
def ensure_future(coro_or_future, loop):
if isinstance(coro_or_future, types.CoroutineType) or isinstance(coro_or_future, types.GeneratorType):
task = tasks.Task(coro_or_future, loop=loop)
return task
else:
return coro_or_future
class Task(futures.Future):
def __init__(self, coro, loop=None):
super().__init__(loop=loop)
self._coro = coro
self._loop = loop
self._context = contextvars.copy_context()
loop.call_soon(self.__step, context=self._context)
_register_task(self)
def __step(self, exc=None):
coro = self._coro
self._fut_waiter = None
try:
result = coro.send(None)
except StopIteration as exc:
super().set_result(exc.value)
else:
self._loop.call_soon(self.__step, context=self._context)
Effectuer d'autres tâches pendant le sommeil
sleep ()
pour des raisons pratiques. Par exempletime.sleep ()
ne peut pas être utilisé dans le sous-traitement. C'est parce qu'une fois que time.sleep ()
est exécuté dans le sous-processus, le processus principal ne peut pas être poursuivi pendant le sommeil, jusqu'à ce que time.sleep ()
se termine. C'est parce que le sous-marin continuera à occuper la boucle d'événements.loop.call_later ()
. Cette fonction exécute la fonction donnée après avoir attendu le nombre de secondes spécifié.
Vous pouvez utiliser cette propriété pour implémenter my_sleep ()
comme suit:import asyncio
import time
def my_sleep(delay):
def _cb_set_result(fut):
fut.set_result(None)
loop = asyncio.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay, _cb_set_result, future)
yield from future
def f(tag):
for i in range(3):
yield from my_sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)
my_sleep ()
. Dans le chapitre précédent, nous avons attendu 3 secondes pour chacun des 3 processus, donc cela a pris un total de 9 secondes. Cependant, ce processus se termine en 3 secondes environ.my_sleep ()
. Dans ce cas, vous pouvez définir la fonction à appeler en tant que générateur comme suit.def g():
yield from my_sleep(10)
return "hello"
def f():
ret = yield from g()
return ret
loop = asyncio.get_event_loop()
ret = loop.run_until_complete(asyncio.gather(f()))
print(ret)
Pourquoi «rendement du futur» au lieu de «rendement du futur»?
my_sleep ()
décrit ci-dessus, la dernière ligne était yield from future
. J'ai utilisé yield
pour définir la valeur à renvoyer lorsque le générateur __next__ ()
est appelé. Au contraire, «yield from» a été spécifié lors de la spécification d'un autre itérateur. Pourquoi utilisez-vous yield from
pour renvoyer Future
, qui n'est pas un itérateur, juste une boîte à laquelle attribuer des résultats?Future
est en fait un itérateur! Future
implémente __iter__ ()
, et cette fonction ressemble à ceci:class Future:
#....
def __iter__(self):
yield self
my_sleep ()
ressemble à ceci:
yield from my_sleep (1)
est exécuté.
1.> Créez un objet générateur pour my_sleep
avec go = my_sleep (1)
2.> Générer un itérateur avec ʻit = go .__ iter__ () (c'est la même chose que
go) 3.>
res = it .__ next__ ()est exécuté pour obtenir le premier élément de
my_sleep4.> L'exécution du contenu de
my_sleep ()` démarre.
yield from
dans my_sleep () ʻest évaluée et
future` est générée.
res_inner = it_inner .__ next__ ()
est exécuté. C'est la même chose que «futur».
8.> res_inner
est la valeur de retour de ʻit .__ next__ (). Autrement dit,
res = future`Lib/asyncio/tasks.py
class Task(futures.Future):
def __step(self, exc=None):
coro = self._coro
try:
result = coro.send(None)
except StopIteration as exc:
super().set_result(exc.value)
elif result != None:
result.add_done_callback(self.__wakeup, context=self._context)
else:
self._loop.call_soon(self.__step, context=self._context)
def __wakeup(self, future):
self.__step()
Utilisez le mot-clé ʻasync
ʻawait
import asyncio
import time
async def f(tag):
for i in range(3):
await asyncio.sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)
au lieu de
my_sleep () `.import asyncio
import time
async def g():
await asyncio.sleep(10)
return "hello"
async def f():
return await g()
asyncio.run(f())
Lib/asyncio/tasks.py
async def sleep(delay, result=None):
loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
return await future
Lib/asyncio/runner.py
def run(main):
loop = events.new_event_loop()
return loop.run_until_complete(main)
à la fin