是否可以为一个 asyncio 任务赋予更高的优先级以实现更精确的超时处理?

Is it possible to give one asyncio task a higher priority to achieve a more precise timout handling?

给定一个 io 和 cpu 绑定的任务列表。当使用 asyncio.wait 超时等待它们时,超时触发很晚。是否可以优先处理未决超时?

此示例定义了模拟 1 秒 cpu 和 1 秒 io 绑定操作的工作程序。它并行运行其中一些并等待 10 秒。

async def worker():
    for _ in range(200):
        # approx one second of cpu bound operations
        for _ in range(10000000):
            random.random()

        # one second of io bound operations
        await asyncio.sleep(1)

async def main(num_tasks):
    tasks = []
    for _ in range(num_tasks):
        tasks.append(asyncio.create_task(worker()))

    time_start = time.monotonic()
    done, pending = await asyncio.wait(tasks, timeout=10)
    print(f'with {num_tasks} tasks waited for {time.monotonic() - time_start:.3}s')

    for task in pending:
        task.cancel()

for num_tasks in range(1, 11):
    asyncio.run(main(num_tasks))

随着任务越来越多,超时时间越来越不准确。

with 1 tasks waited for 10.0s
with 2 tasks waited for 10.9s
with 3 tasks waited for 11.5s
with 4 tasks waited for 12.0s
with 5 tasks waited for 13.2s
with 6 tasks waited for 14.2s
with 7 tasks waited for 15.6s
with 8 tasks waited for 16.8s
with 9 tasks waited for 17.9s
with 10 tasks waited for 18.6s

在我看来,当其中一名工人因 asyncio.sleep 挂起时,应该触发其超时的主协程不会立即被调度。在处理挂起的超时之前安排了多个工作人员。使用 loop.call_later 安排的基础回调似乎与任何其他任务一样对待。

是否可以告诉事件循环优先处理主要任务或任何待处理的回调?

事件循环不支持您所追求的那种优先级。 Asyncio 期望在事件循环协同程序和回调中执行的所有操作都是“快速的”——究竟多快是一个解释问题,但它们需要足够快,以免影响程序的延迟。

为了支持CPU绑定执行,有run_in_executor,它将给定的操作提交给线程池并挂起当前协程直到完成,允许其他协程继续进行。在您的情况下,工作人员将如下所示:

def cpu_bound():
    for _ in range(10000000):
        random.random()

async def worker():
    loop = asyncio.get_event_loop()
    for _ in range(200):
        await loop.run_in_executor(None, cpu_bound)
        await asyncio.sleep(1)

这样修改后,无论任务数量多少,您的程序都会等待 ~10 秒。

在Python 3.9及以后的版本中,您还可以使用asyncio.to_thread