# Awaitable
Three awaitables
:
coroutine
:async def
Task
: schedule acoroutine
run in the same threadFuture
: schedule to run somewhere(same thread, another thread or process); parent class ofTask
to mimicconcurrent.futures.Future
in synchronous programming
# Task Cancellation
asyncio.gather
- bare gather + cancel, wrapped coroutine not running at all
- after cancel if no await on gather => GatheringFuture exception was never retrieved
Examples:
import asyncio
import datetime
async def keep_printing(name):
try:
while True:
print(name, end=" ")
print(datetime.datetime.now())
await asyncio.sleep(0.5)
except BaseException:
print('ah cancelled!')
raise
async def main_no_await():
group_task = asyncio.gather(
keep_printing("First"),
keep_printing("Second"),
keep_printing("Third")
)
group_task.cancel()
await asyncio.sleep(1)
async def main_no_await_cancel_later():
group_task = asyncio.gather(
keep_printing("First"),
keep_printing("Second"),
keep_printing("Third")
)
await asyncio.sleep(1)
group_task.cancel()
async def main_await():
group_task = asyncio.gather(
keep_printing("First"),
keep_printing("Second"),
keep_printing("Third")
)
await asyncio.sleep(1)
group_task.cancel()
await group_task
"""
1. keep_printing not running at all
2. _GatheringFuture exception was never retrieved
"""
asyncio.run(main_no_await())
"""
1. _GatheringFuture exception was never retrieved
"""
asyncio.run(main_no_await_cancel_later())
"""
1. CancelledError thrown
"""
asyncio.run(main_await())
asyncio.Task
Calling cancel
will cause the Task to throw a CancelledError exception into the wrapped coroutine. If a coroutine is awaiting on a Future object during cancellation, the Future object will be cancelled(joint liability).
Note that if the original cancelling task is not awaited, then CancelledError
will not be thrown. Its wrapped coroutine throws CancelledError
as usual, but this exception is not propagated to the original un-awaited task
auto cancel
After loop close, running task will also be cancelled
Examples:
async def small_task(name):
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
print(f'{name} small_task(): cancel sleep')
raise
async def cancel_me():
t1 = asyncio.ensure_future(small_task(1))
t2 = asyncio.ensure_future(small_task(2))
await t1
await t2
async def main_no_await():
task = asyncio.create_task(cancel_me())
await asyncio.sleep(1)
task.cancel()
await asyncio.sleep(10)
async def main_await():
task = asyncio.create_task(cancel_me())
await asyncio.sleep(1)
await task.cancel()
await asyncio.sleep(10)
"""
1. wrapped coroutine cancelled as usual, but main_no_await no exception thrown
2. t2 is cancelled at program end
"""
asyncio.run(main_no_await())
"""
1. main_await throws CancelledError
2. t2 is cancelled at program end
"""
asyncio.run(main_await())
References:
# Circular await
Maximum recursion exception thrown due to f cancels g which cancels f which cancels g...
import asyncio
async def test():
async def f():
await g_task
async def g():
await f_task
f_task = asyncio.create_task(f())
g_task = asyncio.create_task(g())
async def release():
await asyncio.sleep(5)
f_task.cancel()
await asyncio.gather(f_task, g_task, release())
asyncio.run(test())
Coroutine returns error immediately saying coroutine cannot await on itself
async def unit_task():
for task in asyncio.all_tasks():
if task._coro.cr_code.co_name == 'main':
print(task._coro.cr_code.co_name, task._coro.cr_code.co_filename, 'unit')
print(task)
await task
async def recursion():
await unit_task()
async def main():
await recursion()
asyncio.run(main())
Tasks hangs, and if interrupted, maximum recursion error happens at program end
async def unit_task():
for task in asyncio.all_tasks():
if task._coro.cr_code.co_name == 'main':
print(task._coro.cr_code.co_name, task._coro.cr_code.co_filename, 'unit')
print(task)
await task
async def recursion():
t2 = asyncio.create_task(unit_task())
await t2
async def main():
t1 = asyncio.create_task(recursion())
await t1
asyncio.run(main())
# ContextVars
When a new Task is created from coroutine, the context is copied
# Concurrent.futures integration
ProcessPoolExecutor
async def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
print(os.getpid())
return sum(i * i for i in range(10 ** 70))
event_loops_for_each_process: Dict[int, AbstractEventLoop] = {}
def dispatch_coroutines(corofn, *args):
curr_pid = os.getpid()
if curr_pid not in event_loops_for_each_process:
event_loops_for_each_process[curr_pid] = asyncio.new_event_loop()
process_loop = event_loops_for_each_process[curr_pid]
coro = corofn(*args)
return process_loop.create_task(coro)
async def async_gather_tasks(all_tasks: Set[asyncio.Task]):
return await asyncio.gather(*all_tasks)
def wait_loops():
curr_pid = os.getpid()
processes_event_loop = event_loops_for_each_process[curr_pid]
print(f'Process {curr_pid} will wait its tasks.')
return processes_event_loop.run_until_complete(async_gather_tasks(asyncio.all_tasks(processes_event_loop)))
async def main():
"""
Dispatch tasks once and let each worker wait for a batch of tasks to finish, rather than waiting for one task at a time for each worker
"""
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as pool:
# dispatch first
# each worker contains a batch of tasks
futures = [
loop.run_in_executor(pool, dispatch_coroutines, cpu_bound),
loop.run_in_executor(pool, dispatch_coroutines, cpu_bound)
]
await asyncio.wait(futures)
futures = [loop.run_in_executor(pool, wait_loops) for _ in range(2)]
# each worker execute and wait its own tasks
await asyncio.gather(*futures)
# if __name__ == '__main__' is a must on windows
# reason: https://stackoverflow.com/a/20222706
if __name__ == '__main__':
asyncio.run(main())
ThreadPoolExecutor
mport asyncio
import threading
from asyncio import AbstractEventLoop
from concurrent.futures import ThreadPoolExecutor
from time import perf_counter
from typing import Dict, Set
event_loops_for_each_thread: Dict[int, AbstractEventLoop] = {}
def dispatch_coroutines(corofn, *args):
curr_thread_id = threading.get_native_id()
if curr_thread_id not in event_loops_for_each_thread:
event_loops_for_each_thread[curr_thread_id] = asyncio.new_event_loop()
thread_loop = event_loops_for_each_thread[curr_thread_id]
coro = corofn(*args)
return thread_loop.create_task(coro)
async def async_gather_tasks(all_tasks: Set[asyncio.Task]):
return await asyncio.gather(*all_tasks)
def wait_loops():
curr_thread_id = threading.get_native_id()
threads_event_loop = event_loops_for_each_thread[curr_thread_id]
print(f'Thread {curr_thread_id} will wait its tasks.')
return threads_event_loop.run_until_complete(async_gather_tasks(asyncio.all_tasks(threads_event_loop)))
async def main():
loop = asyncio.get_event_loop()
max_workers = 5
executor = ThreadPoolExecutor(max_workers=max_workers)
futures = [
loop.run_in_executor(executor, dispatch_coroutines, asyncio.sleep, 1, x)
for x in range(10)]
await asyncio.wait(futures)
futures = [
loop.run_in_executor(executor, wait_loops)
for _ in range(max_workers)
]
print(await asyncio.gather(*futures))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
start = perf_counter()
loop.run_until_complete(main())
end = perf_counter()
duration_s = end - start
print(f'duration_s={duration_s:.3f}')
Caveat:
loop.run_in_executor
accepts sync function only- use
threading.get_native_id()
to identify threads whileos.getpid()
to distinguish between processes
Reference