Coroutine concept in Python is based on generator.
In python, coroutine concept is actualy based on generator.
Following is a example:
def counter():
i = 0
for i < 9:
yield i
g = counter()
x = next(g)
print(x)
yield
acts like a pause sign. g is a generator object. When next() is called the first time time on g, it will "prime" g and pauses at where there is a yield
. i is thrown to the g's caller. In this case, x is assigned with the thrown i. The generator ojbect won't be destroyed until it hits the end of code block or return
expression and then raises StopIteration
exception. So return
is not necessary in generator because the only fuction of return
in generator is to raise StopIteration
to its caller. That means generator object acts like a thread. It stores context like local varialbes and can be suspended when it hits yield
and terminated when it hits return
.
With the generator, simple coroutine concpet is implementable. The caller uses next()
to tell generator to resumes and use try:
to catch StopIteration
as a sign of coroutine termination.
Important facts here:
yield
in generator suspends execution and pops internal value to its caller, return
in generator throws StopIteration
exception as a notification of termination, next()
in caller resumes generator.
The communication between generator and caller is mutual but not enough. caller can resume generator with next()
but it can not pass parameters to generator. If the scheduler wants to terminate generator or raises an exception before it finishes naturally, it cannot do that.
Python adds send(value)
, throw(type)
, close()
functions to generator so that scheduler can have more control on generator.
send(None)
is equivalent to next()
, caller replaces yield expression
with None and tells generator to resume execution. send(value)
will send value
to generator at the yield
it stops last time. So if it is suspended in generator last time like x = yield 1
, x will be assigned to value
when generator resumes. yield
now acts like bidirectional communication proxy. It throws expresson to caller when it suspends generator and accepts value from caller's send()
and resumes execution.
Differences between generator and coroutine
generator's caller wants to make use of each value thrown by yield
.
But generate based coroutine is different, here yield
is used to pause execution and store context for resumption next time. Its caller generally cares about final result returned and pausing/resuming execution by yield
.
Coroutine programming model
A good abstract of asyncio programming contains several concepts:
- async producer like I/O tooks quite long time
- async consumer function that wait for result of async producer and do some post processing data
- task that wraps async consumer and query its status by using send(None) and communicate with scheduler to tell it wheither the async producer is suspended or terminated
- scheduler (event loop) gets status of async consumer through task and schedule next async consumer to resume or start
Another example:
# async producer, "result from coroutine" is the truly wanted result for async consumer.
def counter():
i = 0
while i < 2:
yield i
i = i+1
print(f"resume {i}")
yield "result from coroutine"
# async consumer.
def filter():
z = 0
for x in counter():
z = yield x
# try to post process the final results from counter()
print(f"filter1 process {z}")
def filter2():
z = yield from counter()
# try to post process the final results from counter()
print(f"filter2 process {z}")
def filter3():
z = 0
for x in counter():
z = x
yield
# try to post process the final results from counter()
print(f"filter3 process {z}")
def filter4():
z = 0
t = counter()
while True:
try:
z = t.send(None)
yield
except StopIteration:
# z already holds targeted result from counter()
break
# try to post process the final results from counter()
print(f"filter4 process {z}")
class task:
def __init__(self, coro):
self.coro = coro
self.started = True
self.finished = False
def run(self):
try:
self.coro.send(None)
except StopIteration:
self.finished = True
return self.finished
t1 = task(filter())
t2 = task(filter2())
t3 = task(filter3())
t4 = task(filter4())
# scheduler
task_queue = [t1, t2, t3, t4]
while task_queue:
cur = task_queue.pop(0)
if not cur.run():
task_queue.append(cur)
In the expample above, task() is just a wrap of async consumer. It hides the detailed communiation between task and async consumer by using send(None) and returns status of async consumer to scheduler. scheduler is just a while loop to run tasks. It terminates when all tasks are finsihed.
The difficult part is async consumer. It needs to check status of async producer and try to process data returned from async producer. filter()
and filter2()
can handle bi-directional communiation well. But due to side effect of send(value)
, they cannot capture the last yielded data. The last data it gets is always value
sent by caller's send(value)
. In order to make it work, we need to cache value yielded last time and sent it back by send(value)
. It is not an elegant solution because we don't want task to take care of any data processing and pass interim data back and forth. In order to isolate task from data processing, filter3() and filter4() is better solution.
I guess it is partially the reason why now Python want to abandon future usage of yield from
in coroutine.
async/await
is proposed in coroutine now. Though await
is not the same as yield from
.
1. await
can do bi-directional communiation as yield from
.
2. await
can capture final result without special treatment in task.
3. await
can capture expression returned by return
.
await
is more like:
while True:
try:
z = t.send(None)
yield
except StopIteration as stop:
# z already holds targeted result from counter()
z = stop.value
break
Bullet 3 is most important, by carefully pass final results using return
, its caller does not need to know details about lower leverl coroutine.
By introducing async/await
, now it is more convient to isolate async producer / async consumer / task / scheduler.
async producer is the only one that needs yield
to incorporate async fucntion. async consumer can call async producer by await
and bypass communication between task and async producer. task is wrapper of async consumer and hides details of query using send(None)
. scheduler is the event loop that check statuses of multiple tasks and schdule running of these tasks.
async producer is lower level async library, task/event loop are implemented in asyncio library. End users only need to care about doing some processings in async consumer. await
is the only one that they need to know. If end users don't know underlying mechanism of yield
, they will use return
to pass results to cascaded async consumer. That will avoid many pitfalls.
So to understand coroutine programming, it is better to just stop at async/await
or you have to dive deeply into concepts of yield
, send
, etc.. The latter generally is more painful.
Following code shows another example of coroutine:
It is recommended to comment last line in gen() and compare the results to understand effects of yield/await/send(None)
import asyncio
from types import coroutine
@coroutine
def gen():
print( "before gen" )
yield "gen"
print( "after gen" )
yield "final"
return "finally finished"
async def filter():
print ( "before filter" )
x = await gen()
print(f"filter {x}")
print( "after filter" )
return f"filter {x}"
async def channel():
print( "before channel" )
y = await filter()
print( "after channel" )
print(f"channel {y}")
test = channel()
while True:
try:
print( "******************" )
print( "begin to send None" )
print( "******************" )
print("yield result:" + test.send(None))
except StopIteration as err:
print( "chained channel stopped" )
break