Future
When you perform a time-consuming calculation in Python and want to get the result, you usually put the process into a function and get it as the return value when you execute the function. This is called synchronous processing.
On the other hand, there is asynchronous processing as a concept different from synchronous processing. This exchanges the following between the process that requests the calculation (receiver) and the process that performs the actual calculation (sender) via an object called Future
.
-(receiver) Create a Future
object.
-(receiver) Execute receiver by some means.
-(sender) Performs a time-consuming calculation and writes the calculation result to the Future
object generated by the receiver.
-(receiver) Check the Future
object, and if the calculation result is stored, get the calculation result.
The processing up to this point is as follows, for example.
import asyncio
import time
def f(future):
time.sleep(5) #Time-consuming process
future.set_result("hello")
return
future = asyncio.futures.Future()
f(future)
if future.done():
res = future.result()
print(res)
When I do this, it says "hello" after waiting 5 seconds.
Lib/asyncio/futures.py
class Future:
_state = _PENDING
_result = None
def done(self):
return self._state != _PENDING
def result(self):
if self._state != _FINISHED:
raise exceptions.InvalidStateError('Result is not ready.')
return self._result
def set_result(self, result):
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
As you've already noticed, the above code is the same as a normal function call, except that it uses a Future
object. This is because the receiver is executing the sender's code directly. This does not take advantage of Future
.
This is where the concept of event loops comes into play. An event loop is an object that has 0 or 1 objects in 1 thread, and has the function of executing registered functions.
Let's actually use it.
import asyncio
import time
def f(future):
time.sleep(5) #Time-consuming process
future.set_result("hello")
return
loop = asyncio.get_event_loop()
future = asyncio.futures.Future()
loop.call_soon(f, future)
loop.run_forever()
In the code above, we are calling ʻasyncio.get_event_loop to get the
BaseEventLoopobject. Then, the function
f is registered in
loopby
call_soon. Finally, the event loop is executed with
loop.run_forever ()`.
When I actually do this, I'm in an infinite loop with run_forever ()
and the program never ends. Instead, you can automatically stop the event loop after the function f ()
has finished executing by writing:
res = loop.run_until_complete(future)
print(res)
How can run_until_complete ()
know the completion of the functionf ()
? A mechanism called a future
callback is used for this.
In run_until_complete ()
, the function future.add_done_callback ()
is first executed, and the callback is set in future
. Then run_forever ()
is called and the function f ()
is executed. If the value is then set by future.set_result ()
in the function f ()
, the callback set by ʻadd_done_callback ()will be called. In the callback set by
run_until_complete (), the event loop is stopped after the execution of
f ()ends because the process of reserving the end of the event loop is performed by
loop.stop ()`. ..
Note that future.set_result ()
is not executed and the function f ()
is not immediately terminated. Only the end is reserved, and the execution actually continues until return
.
Lib/asyncio/events.py
import contextvars
class Handle:
def __init__(self, callback, args, loop):
self._context = contextvars.copy_context()
self._loop = loop
self._callback = callback
self._args = args
def _run(self):
self._context.run(self._callback, *self._args)
Lib/asyncio/base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._stopping = False
self._ready = collections.deque()
def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
self._ready.append(handle)
return handle
def _run_once(self):
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
handle._run()
def run_forever(self):
while True:
self._run_once()
if self._stopping:
break
def run_until_complete(self, future):
def _run_until_complete_cb(fut):
self.stop()
future.add_done_callback(_run_until_complete_cb)
self.run_forever()
return future.result()
def stop(self):
self._stopping = True
Lib/asyncio/futures.py
class Future:
def add_done_callback(self, fn):
context = contextvars.copy_context()
self._callbacks.append((fn, context))
def set_result(self, result):
# ...abridgement
for callback, ctx in self._callbacks[:]:
self._loop.call_soon(callback, self, context=ctx)
In the previous chapter, processing was executed using an event loop. However, the only thing that changed was that the function f
, which performs time-consuming processing, was not executed directly, but was executed via an event loop. This does not change what you are doing.
The true nature of the event loop comes into play when you perform multiple processes. Let's actually do it.
import asyncio
import time
def f(future, tag):
for _ in range(3):
time.sleep(1)
print("waiting for f(%d)" % tag)
future.set_result("hello %d" % tag)
return
loop = asyncio.get_event_loop()
futures = []
for tag in range(3):
future = loop.create_future()
loop.call_soon(f, future, tag)
futures += [future]
res = loop.run_until_complete(asyncio.gather(*futures))
print(res)
This code registers three processes. We also use a new function called ʻasyncio.gather to bundle multiple
Future`s into one. The result of this execution is as follows.
waiting for f(0)
waiting for f(0)
waiting for f(0)
waiting for f(1)
waiting for f(1)
waiting for f(1)
waiting for f(2)
waiting for f(2)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']
Note that as you can see from this result, f (0)
, f (1)
, f (2)
are not running in parallel. As you can see from the source code of the library, in loop.run_until_complete ()
, the callbacks registered in loop._ready
are only executed sequentially.
Lib/asyncio/tasks.py
class _GatheringFuture(futures.Future):
def __init__(self, children, *, loop=None):
super().__init__(loop=loop)
self._children = children
self._cancel_requested = False
def gather(*coros_or_futures, loop=None, return_exceptions=False):
def _done_callback(fut):
nonlocal nfinished
nfinished += 1
if nfinished == nfuts:
results = []
for fut in children:
res = fut.result()
results.append(res)
outer.set_result(results)
arg_to_fut = {}
children = []
nfuts = 0
nfinished = 0
for arg in coros_or_futures:
nfuts += 1
fut.add_done_callback(_done_callback)
children.append(fut)
outer = _GatheringFuture(children, loop=loop)
return outer
Now let's derail and see about the Python generator.
A generator is a "function that returns an iterator". Running the generator returns a generator object. The generator object implements the function __iter__ ()
that represents an iterator. The generator is implemented as follows.
def generator():
yield 1
yield 2
yield 3
return "END"
gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
print(gg.__next__())
except StopIteration as e:
print(e.value)
Here, yield
has the function of temporarily stopping the processing of the contents of the generator. The generators can also be stacked in two layers.
def generator2():
yield 1
yield 2
yield 3
return "END"
def generator():
a = yield from generator2()
return a
gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
print(gg.__next__())
except StopIteration as e:
print(e.value)
Both execution results
1
2
3
END
It will be.
As mentioned in the previous chapter, when executing multiple functions using loop.run_until_complete
, the second function is executed after the execution of the first function is completed, and so on. The functions are not executed in parallel, but in sequence. Here, if you use a generator instead of a function, it will be as follows.
import asyncio
import time
def f(tag):
for _ in range(3):
yield
time.sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = []
for tag in range(3):
task = f(tag)
tasks += [task]
res = loop.run_until_complete(asyncio.gather(*tasks))
print(res)
Here, I added a yield
instruction in the functionf ()
and returned the calculation result as return
instead of future.set_result
. The argument future
is no longer needed and has been removed.
The result of this execution is as follows.
waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']
In the previous chapter, f (0)
was displayed three times, then f (1)
was displayed, and ... was changed to f (0)
, f (1)
. , f (2)
are now displayed in that order. This is because even if multiple tasks are registered in the event loop, they are all executed within one thread. Also, the event loop cannot pause the execution of a Python function, so it has to keep executing one function until it voluntarily stops, such as by return
.
On the other hand, if you use a generator, yield
will pause the execution of the function. Since the processing returns to the event loop side at this timing, the task executed by the event loop can be switched.
By the way, the smallest example of using a generator is: (This makes no sense to make it a generator because there is only one task ...)
import asyncio
import time
def f():
time.sleep(5) #Time-consuming process
yield
return "hello"
loop = asyncio.get_event_loop()
ret = loop.run_until_complete(f())
print(ret)
By the way, in the version that does not use a generator, the function f ()
was registered in the event loop by calling loop.call_soon ()
, but those who have doubts that this is not called in this chapter. I think there are many. Specifically, it is as follows.
Function name | argument(Future version) | argument(Generator version) |
---|---|---|
f() |
future |
None |
loop.call_soon() |
f |
-- |
loop.run_until_complete() |
future |
f |
Within run_until_complete ()
, if the given argument is a generator object (obtained by calling the functionf ()
defined as a generator), then a Task
instance (a subclass of Future
). To generate. Call_soon ()
is called internally at the time of this generation.
Lib/asyncio/base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def run_until_complete(self, future):
future = tasks.ensure_future(future, loop=self)
future.add_done_callback(_run_until_complete_cb)
self.run_forever()
return future.result()
Lib/asyncio/tasks.py
def ensure_future(coro_or_future, loop):
if isinstance(coro_or_future, types.CoroutineType) or isinstance(coro_or_future, types.GeneratorType):
task = tasks.Task(coro_or_future, loop=loop)
return task
else:
return coro_or_future
class Task(futures.Future):
def __init__(self, coro, loop=None):
super().__init__(loop=loop)
self._coro = coro
self._loop = loop
self._context = contextvars.copy_context()
loop.call_soon(self.__step, context=self._context)
_register_task(self)
def __step(self, exc=None):
coro = self._coro
self._fut_waiter = None
try:
result = coro.send(None)
except StopIteration as exc:
super().set_result(exc.value)
else:
self._loop.call_soon(self.__step, context=self._context)
In the examples so far, time.sleep ()
was heavily used. This is, of course, to illustrate "time-consuming processing", but you may want to actually sleep ()
for practical purposes. For example
--Time-out processing that cancels after waiting for a certain period of time as a sub processing while performing network communication in the main processing --Display progress in sub-processes while performing time-consuming calculations in the main process
However, in such a case, time.sleep ()
cannot be used in the sub processing. This is because once time.sleep ()
is executed in the sub process, the main process cannot be continued while sleeping, until time.sleep ()
ends. This is because the sub will continue to occupy the event loop.
I want to wait for a certain period of time in a task, but I want to return the processing to the event loop during the waiting time. In such cases, you can use the loop.call_later ()
function. This function executes the given function after waiting a specified number of seconds.
You can use this property to implement my_sleep ()
as follows:
import asyncio
import time
def my_sleep(delay):
def _cb_set_result(fut):
fut.set_result(None)
loop = asyncio.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay, _cb_set_result, future)
yield from future
def f(tag):
for i in range(3):
yield from my_sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)
This is a rewrite of the process in the previous chapter using my_sleep ()
. In the previous chapter, we waited 3 seconds for each of the 3 processes, so it took a total of 9 seconds. However, this process finishes in about 3 seconds.
It can be a little more complicated. For example, suppose you are calling a function in a task and that function is trying to my_sleep ()
. In this case, it is okay to define the function to be called as a generator as follows.
def g():
yield from my_sleep(10)
return "hello"
def f():
ret = yield from g()
return ret
loop = asyncio.get_event_loop()
ret = loop.run_until_complete(asyncio.gather(f()))
print(ret)
yield from future
instead of yield future
?You may have noticed that in the my_sleep ()
code described above, the last line was yield from future
. I used yield
to set the value to return when the generator__next__ ()
is called. Conversely, yield from
was specified when specifying another iterator. Why are you using yield from
to replace Future
, which is not an iterator, just a box to assign results to?
For technical reasons, the Future
instance is actually an iterator! Future
implements __iter__ ()
, and this function looks like this:
class Future:
#....
def __iter__(self):
yield self
That is, the iteration of my_sleep ()
looks like this:
yield from my_sleep (1)
is executed.
1.> Create a generator object for my_sleep
withgo = my_sleep (1)
2.> Generate an iterator by ʻit = go.iter () (this is the same as
go) 3.>
res = it .__ next __ ()is executed to get the first element of
my_sleep4.> Execution of the contents of
my_sleep ()` starts.The expression on the right-hand side of
yield from
inmy_sleep ()
is evaluated andfuture
is generated.
ʻit_inner = future.iter () `is executed.
res_inner = it_inner.__next__ ()
is executed. This is the same asfuture
. 8.>res_inner
is the return value of ʻit.next (). That is,
res = future`
Another political reason is that we wanted to be able to handle generators (or coroutines) and Future
in the same line. This is also connected to ʻawait` in the next chapter.
Lib/asyncio/tasks.py
class Task(futures.Future):
def __step(self, exc=None):
coro = self._coro
try:
result = coro.send(None)
except StopIteration as exc:
super().set_result(exc.value)
elif result != None:
result.add_done_callback(self.__wakeup, context=self._context)
else:
self._loop.call_soon(self.__step, context=self._context)
def __wakeup(self, future):
self.__step()
ʻawait
keywordThe same code as in the previous chapter can be written as follows in Python 3.7 or later.
(Strictly speaking, there are some differences: coroutine
is used in this chapter and generator
is used in the previous chapter.)
import asyncio
import time
async def f(tag):
for i in range(3):
await asyncio.sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)
In this format, you can use ʻasyncio.sleep ()instead of
my_sleep ()`.
Also, if you have only one task, you can write it even easier using ʻasyncio.run ()`.
import asyncio
import time
async def g():
await asyncio.sleep(10)
return "hello"
async def f():
return await g()
asyncio.run(f())
Lib/asyncio/tasks.py
async def sleep(delay, result=None):
loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
return await future
Lib/asyncio/runner.py
def run(main):
loop = events.new_event_loop()
return loop.run_until_complete(main)
I was collecting such articles in the draft, but since other people had published similar articles, I also decided to publish (?) In a hurry.
Recommended Posts