References:

https://docs.python.org/3/library/asyncio-eventloop.html

https://docs.python.org/3/library/asyncio-task.html

https://www.aeracode.org/2018/02/19/python-async-simplified/

https://github.com/django/asgiref/blob/master/asgiref/sync.py#L88

https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library

https://github.com/balkierode/assortedscripts/blob/master/python/blockex.py

https://www.4async.com/2016/02/simple-implement-asyncio-to-understand-how-async-works/

https://github.com/dabeaz/curio

Mechanism of coroutine

https://realpython.com/async-io-python

https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5

https://www.python.org/dev/peps/pep-0342

https://pymotw.com/3/asyncio/coroutines.html

https://realpython.com/python-concurrency

https://training.talkpython.fm/courses/explore_async_python/async-in-python-with-threading-and-multiprocessing

https://docs.python.org/3/reference/datamodel.html#coroutines

http://www.dabeaz.com/coroutines

Brief intro about coroutine in Python

Starting from 3.3, Python begins to add coroutine feature. The concept of coroutine is to create multiple tasks in single thread to avoid context switching cost of multi-threads programming.

Inside the thread, a loop is created to manage all tasks scheduled by coroutines.

In old version, yield from was used to create coroutine. Later await/asyc is suggested for creating coroutine.

await waits for results of awaitable objects, like coroutine/Future/task. It is used in a coroutine.

async is used to define coroutine like:

async def test():
    await awaitable_ojb
    ...

Coroutine style programming requires 'await/async' defined 'call/def' from bottom to top.

Two pitfalss lying here:

  1. At top level, e.g. __main__, which is blocking mode, needs to call loop functions and add tasks defined in coroutine format to loop. The loop will then take care of running/suspeding/termination of these tasks.

  2. Use await to call awaitable functions in coroutine. Don't use await to call blocking code since it will jam the loop.

Use of blocking lib in async code

Blocking lib will jam current thread so they cannot be used directly in non-blcoking code.

Python provides run_in_executor() function to call the blocking lib in another thread or process.

awaitable loop.run_in_executor(executor, func, *args)

The executor argument should be an concurrent.futures.Executor instance. The default executor is used if executor is None.

func is the function from blocking lib.

executor can be either a thread or process created by concurrent.future.Executor.

run_in_executor() will schedue blocking code in another thread or process but reture a Future object, so that it is awaitable and can be called by await in coroutine. This solution is a mixed mode solution since it creates new threads or processes, which does not meet concept of pure coroutine.

asyncio.get_running_loop()

Return the running event loop in the current OS thread.

If there is no running event loop a RuntimeError is raised. This function can only be called from a coroutine or a callback.

New in version 3.7.

Example from official document:

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

So by properly wrapping run_in_executor(), it is possilbe to use blocking lib in coroutine.

import functools
import asyncio
import httplib2
import time


def sync2async(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, f, *args, *kwargs)
    return wrapper


@sync2async
def get_resp(link):  # You cannot change this function
    h = httplib2.Http()
    resp, content = h.request(link, "GET")
    return resp


async def process_page(link):
    print(f"started at {time.strftime('%X')}")
    res = await get_resp(link)
    print(res['content-location'])
    print(f"finished at {time.strftime('%X')}")


async def main():
    links = ['https://docs.python.org/3/library/asyncio.html',
            'https://www.cnbeta.com/', #slowest website, should be the last one to be loaded.
            'https://vi.stackexchange.com/questions/5601/why-are-my-windows-scrolling-together']
    tasks = [asyncio.create_task(process_page(x)) for x in links]
    for t in tasks:
        await t

if __name__ == '__main__':
    asyncio.run(main()) 

Conclusion

It is possbile to convert blocking lib to be compatibe with coroutine. In the background, it is still multi-threads or multi-processes programming.


Published

Category

Python

Tags

Contact