Coroutine concept in Python is based on generator.

In python, coroutine concept is actualy based on generator.

Following is a example:

def counter():
    i = 0
    for i < 9:
         yield i

g = counter()
x = next(g)
print(x)

yield acts like a pause sign. g is a generator object. When next() is called the first time time on g, it will "prime" g and pauses at where there is a yield. i is thrown to the g's caller. In this case, x is assigned with the thrown i. The generator ojbect won't be destroyed until it hits the end of code block or return expression and then raises StopIteration exception. So return is not necessary in generator because the only fuction of return in generator is to raise StopIteration to its caller. That means generator object acts like a thread. It stores context like local varialbes and can be suspended when it hits yield and terminated when it hits return.

With the generator, simple coroutine concpet is implementable. The caller uses next() to tell generator to resumes and use try: to catch StopIteration as a sign of coroutine termination.

Important facts here:

yield in generator suspends execution and pops internal value to its caller, return in generator throws StopIteration exception as a notification of termination, next() in caller resumes generator.

The communication between generator and caller is mutual but not enough. caller can resume generator with next() but it can not pass parameters to generator. If the scheduler wants to terminate generator or raises an exception before it finishes naturally, it cannot do that.

Python adds send(value), throw(type), close() functions to generator so that scheduler can have more control on generator.

send(None) is equivalent to next(), caller replaces yield expression with None and tells generator to resume execution. send(value) will send value to generator at the yield it stops last time. So if it is suspended in generator last time like x = yield 1, x will be assigned to value when generator resumes. yield now acts like bidirectional communication proxy. It throws expresson to caller when it suspends generator and accepts value from caller's send() and resumes execution.

Differences between generator and coroutine

generator's caller wants to make use of each value thrown by yield. But generate based coroutine is different, here yield is used to pause execution and store context for resumption next time. Its caller generally cares about final result returned and pausing/resuming execution by yield.

Coroutine programming model

A good abstract of asyncio programming contains several concepts:

  1. async producer like I/O tooks quite long time
  2. async consumer function that wait for result of async producer and do some post processing data
  3. task that wraps async consumer and query its status by using send(None) and communicate with scheduler to tell it wheither the async producer is suspended or terminated
  4. scheduler (event loop) gets status of async consumer through task and schedule next async consumer to resume or start

Another example:

# async producer, "result from coroutine" is the truly wanted result for async consumer.
def counter():
    i = 0
    while i < 2:
        yield i
        i = i+1
        print(f"resume {i}")
    yield "result from coroutine"

# async consumer.
def filter():
    z = 0
    for x in counter():
        z = yield x
    # try to post process the final results from counter()
    print(f"filter1 process {z}")

def filter2():
    z = yield from counter()
    # try to post process the final results from counter()
    print(f"filter2 process {z}")

def filter3():
    z = 0
    for x in counter():
        z = x
        yield
    # try to post process the final results from counter()
    print(f"filter3 process {z}")

def filter4():
    z = 0
    t = counter()
    while True:
        try:
            z = t.send(None)
            yield
        except StopIteration:
            # z already holds targeted result from counter()
            break
    # try to post process the final results from counter()
    print(f"filter4 process {z}")

class task:
    def __init__(self, coro):
        self.coro = coro
        self.started = True
        self.finished = False

    def run(self):
        try:
            self.coro.send(None)
        except StopIteration:
            self.finished = True
        return self.finished

t1 = task(filter())
t2 = task(filter2())
t3 = task(filter3())
t4 = task(filter4())

# scheduler
task_queue = [t1, t2, t3, t4]
while task_queue:
    cur = task_queue.pop(0)
    if not cur.run():
        task_queue.append(cur)

In the expample above, task() is just a wrap of async consumer. It hides the detailed communiation between task and async consumer by using send(None) and returns status of async consumer to scheduler. scheduler is just a while loop to run tasks. It terminates when all tasks are finsihed.

The difficult part is async consumer. It needs to check status of async producer and try to process data returned from async producer. filter() and filter2() can handle bi-directional communiation well. But due to side effect of send(value), they cannot capture the last yielded data. The last data it gets is always value sent by caller's send(value). In order to make it work, we need to cache value yielded last time and sent it back by send(value). It is not an elegant solution because we don't want task to take care of any data processing and pass interim data back and forth. In order to isolate task from data processing, filter3() and filter4() is better solution.

I guess it is partially the reason why now Python want to abandon future usage of yield from in coroutine.

async/await is proposed in coroutine now. Though await is not the same as yield from. 1. await can do bi-directional communiation as yield from. 2. await can capture final result without special treatment in task. 3. await can capture expression returned by return.

await is more like:

while True:
    try:
        z = t.send(None)
        yield
    except StopIteration as stop:
        # z already holds targeted result from counter()
        z = stop.value
        break

Bullet 3 is most important, by carefully pass final results using return, its caller does not need to know details about lower leverl coroutine.

By introducing async/await, now it is more convient to isolate async producer / async consumer / task / scheduler. async producer is the only one that needs yield to incorporate async fucntion. async consumer can call async producer by await and bypass communication between task and async producer. task is wrapper of async consumer and hides details of query using send(None). scheduler is the event loop that check statuses of multiple tasks and schdule running of these tasks.

async producer is lower level async library, task/event loop are implemented in asyncio library. End users only need to care about doing some processings in async consumer. await is the only one that they need to know. If end users don't know underlying mechanism of yield, they will use return to pass results to cascaded async consumer. That will avoid many pitfalls.

So to understand coroutine programming, it is better to just stop at async/await or you have to dive deeply into concepts of yield, send, etc.. The latter generally is more painful.

Following code shows another example of coroutine: It is recommended to comment last line in gen() and compare the results to understand effects of yield/await/send(None)

import asyncio
from types import coroutine

@coroutine
def gen():
    print( "before gen" )
    yield "gen"
    print( "after gen" )
    yield "final"
    return "finally finished"

async def filter():
    print ( "before filter" )
    x = await gen()
    print(f"filter {x}")
    print( "after filter" )
    return f"filter {x}"

async def channel():
    print( "before channel" )
    y = await filter()
    print( "after channel" )
    print(f"channel {y}")

test = channel()
while True:
    try:
        print( "******************" )
        print( "begin to send None" )
        print( "******************" )
        print("yield result:" + test.send(None))
    except StopIteration as err:
        print( "chained channel stopped" )
        break

Published

Category

Python

Tags

Contact