# Awaitable

Three awaitables:

  • coroutine: async def
  • Task: schedule a coroutine run in the same thread
  • Future: schedule to run somewhere(same thread, another thread or process); parent class of Task to mimic concurrent.futures.Future in synchronous programming

# Task Cancellation

asyncio.gather

  1. bare gather + cancel, wrapped coroutine not running at all
  2. after cancel if no await on gather => GatheringFuture exception was never retrieved

Examples:

import asyncio
import datetime

async def keep_printing(name):
    try:
        while True:
            print(name, end=" ")
            print(datetime.datetime.now())
            await asyncio.sleep(0.5)
    except BaseException:
        print('ah cancelled!')
        raise

async def main_no_await():
    group_task = asyncio.gather(
        keep_printing("First"),
        keep_printing("Second"),
        keep_printing("Third")
    )
    group_task.cancel()
    await asyncio.sleep(1)

async def main_no_await_cancel_later():
    group_task = asyncio.gather(
        keep_printing("First"),
        keep_printing("Second"),
        keep_printing("Third")
    )
    await asyncio.sleep(1)
    group_task.cancel()

async def main_await():
    group_task = asyncio.gather(
        keep_printing("First"),
        keep_printing("Second"),
        keep_printing("Third")
    )

    await asyncio.sleep(1)
    group_task.cancel()
    await group_task

"""
1. keep_printing not running at all
2. _GatheringFuture exception was never retrieved
"""
asyncio.run(main_no_await())

"""
1. _GatheringFuture exception was never retrieved
"""
asyncio.run(main_no_await_cancel_later())

"""
1. CancelledError thrown
"""
asyncio.run(main_await())

asyncio.Task

Calling cancel will cause the Task to throw a CancelledError exception into the wrapped coroutine. If a coroutine is awaiting on a Future object during cancellation, the Future object will be cancelled(joint liability).

Note that if the original cancelling task is not awaited, then CancelledError will not be thrown. Its wrapped coroutine throws CancelledError as usual, but this exception is not propagated to the original un-awaited task

auto cancel

After loop close, running task will also be cancelled

Examples:

async def small_task(name):
    try:
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print(f'{name} small_task(): cancel sleep')
        raise

async def cancel_me():
    t1 = asyncio.ensure_future(small_task(1))
    t2 = asyncio.ensure_future(small_task(2))
    await t1
    await t2

async def main_no_await():
    task = asyncio.create_task(cancel_me())
    await asyncio.sleep(1)
    task.cancel()
    await asyncio.sleep(10)

async def main_await():
    task = asyncio.create_task(cancel_me())
    await asyncio.sleep(1)
    await task.cancel()
    await asyncio.sleep(10)

"""
1. wrapped coroutine cancelled as usual, but main_no_await no exception thrown
2. t2 is cancelled at program end
"""
asyncio.run(main_no_await())

"""
1. main_await throws CancelledError
2. t2 is cancelled at program end
"""
asyncio.run(main_await())

References:

  1. python issue

# Circular await

Maximum recursion exception thrown due to f cancels g which cancels f which cancels g...

import asyncio


async def test():
    async def f():
        await g_task

    async def g():
        await f_task

    f_task = asyncio.create_task(f())
    g_task = asyncio.create_task(g())

    async def release():
        await asyncio.sleep(5)
        f_task.cancel()

    await asyncio.gather(f_task, g_task, release())

asyncio.run(test())

Coroutine returns error immediately saying coroutine cannot await on itself

async def unit_task():
    for task in asyncio.all_tasks():
        if task._coro.cr_code.co_name == 'main':
            print(task._coro.cr_code.co_name, task._coro.cr_code.co_filename, 'unit')
            print(task)
            await task

async def recursion():
    await unit_task()

async def main():
    await recursion()

asyncio.run(main())

Tasks hangs, and if interrupted, maximum recursion error happens at program end


async def unit_task():
    for task in asyncio.all_tasks():
        if task._coro.cr_code.co_name == 'main':
            print(task._coro.cr_code.co_name, task._coro.cr_code.co_filename, 'unit')
            print(task)
            await task

async def recursion():
    t2 = asyncio.create_task(unit_task())
    await t2

async def main():
    t1 = asyncio.create_task(recursion())
    await t1


asyncio.run(main())

# ContextVars

When a new Task is created from coroutine, the context is copied

# Concurrent.futures integration

ProcessPoolExecutor

async def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    print(os.getpid())
    return sum(i * i for i in range(10 ** 70))


event_loops_for_each_process: Dict[int, AbstractEventLoop] = {}

def dispatch_coroutines(corofn, *args):
    curr_pid = os.getpid()

    if curr_pid not in event_loops_for_each_process:
        event_loops_for_each_process[curr_pid] = asyncio.new_event_loop()

    process_loop = event_loops_for_each_process[curr_pid]
    coro = corofn(*args)
    return process_loop.create_task(coro)

async def async_gather_tasks(all_tasks: Set[asyncio.Task]):
    return await asyncio.gather(*all_tasks)

def wait_loops():
    curr_pid = os.getpid()
    processes_event_loop = event_loops_for_each_process[curr_pid]
    print(f'Process {curr_pid} will wait its tasks.')
    return processes_event_loop.run_until_complete(async_gather_tasks(asyncio.all_tasks(processes_event_loop)))

async def main():
    """
    Dispatch tasks once and let each worker wait for a batch of tasks to finish, rather than waiting for one task at a time for each worker
    """
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as pool:
        # dispatch first
        # each worker contains a batch of tasks
        futures = [
            loop.run_in_executor(pool, dispatch_coroutines, cpu_bound),
            loop.run_in_executor(pool, dispatch_coroutines, cpu_bound)
        ]
        await asyncio.wait(futures)
        futures = [loop.run_in_executor(pool, wait_loops) for _ in range(2)]
        # each worker execute and wait its own tasks
        await asyncio.gather(*futures)

# if __name__ == '__main__' is a must on windows
# reason: https://stackoverflow.com/a/20222706
if __name__ == '__main__':
    asyncio.run(main())

ThreadPoolExecutor

mport asyncio
import threading
from asyncio import AbstractEventLoop
from concurrent.futures import ThreadPoolExecutor
from time import perf_counter
from typing import Dict, Set

event_loops_for_each_thread: Dict[int, AbstractEventLoop] = {}


def dispatch_coroutines(corofn, *args):
    curr_thread_id = threading.get_native_id()

    if curr_thread_id not in event_loops_for_each_thread:
        event_loops_for_each_thread[curr_thread_id] = asyncio.new_event_loop()

    thread_loop = event_loops_for_each_thread[curr_thread_id]
    coro = corofn(*args)
    return thread_loop.create_task(coro)


async def async_gather_tasks(all_tasks: Set[asyncio.Task]):
    return await asyncio.gather(*all_tasks)


def wait_loops():
    curr_thread_id = threading.get_native_id()
    threads_event_loop = event_loops_for_each_thread[curr_thread_id]

    print(f'Thread {curr_thread_id} will wait its tasks.')
    return threads_event_loop.run_until_complete(async_gather_tasks(asyncio.all_tasks(threads_event_loop)))


async def main():
    loop = asyncio.get_event_loop()
    max_workers = 5
    executor = ThreadPoolExecutor(max_workers=max_workers)
    futures = [
        loop.run_in_executor(executor, dispatch_coroutines, asyncio.sleep, 1, x)
        for x in range(10)]
    await asyncio.wait(futures)
    futures = [
        loop.run_in_executor(executor, wait_loops)
        for _ in range(max_workers)
    ]
    print(await asyncio.gather(*futures))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    start = perf_counter()
    loop.run_until_complete(main())
    end = perf_counter()
    duration_s = end - start
    print(f'duration_s={duration_s:.3f}')

Caveat:

  1. loop.run_in_executor accepts sync function only
  2. use threading.get_native_id() to identify threads while os.getpid() to distinguish between processes

Reference

  1. loop.run_in_executor python docs
  2. Use coroutine with run_in_executor
Last Updated: 2/1/2024, 4:22:58 PM