异步IO。如何异步调用1000个方法并在准备好后立即获取异步结果

AsyncIO. How to call 1000 methods asynchronously and get the asynchronously result as soon as it is ready

我征求你的意见。我想通过一个简单的例子来理解异步的工作。根据传说,您需要创建 1000 个 return 一些结果的工人。但是您需要在准备就绪后立即 return 它。 这是一个例子:

import asyncio


async def worker(number):
    print("worker # %d" % number)
    await asyncio.sleep(0)
    return str(number)


async def print_when_done(tasks):
    for res in asyncio.as_completed(tasks):
        print("Result worker %s" % await res)


coros = [worker(i) for i in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

问题是这个例子的工作结果不是同步的,它只是调用函数而不阻塞主进程,最后它return所有函数的响应

worker # 2
worker # 3
worker # 4
worker # 1
worker # 0
Result worker 2
Result worker 3
Result worker 4
Result worker 1
Result worker 0

但是如何达到相似的结果:

worker # 2
worker # 3
worker # 4
Result worker 3
Result worker 2
worker # 1
Result worker 4
worker # 0
Result worker 1
Result worker 0

你当然可以创建一个ThreadPoolExecutor,或者ProcessPoolExecutor。但是为什么你需要 Asyncio,你可以在没有它的情况下创建线程并使用它们。

您正在寻找 asyncio.wait:

from concurrent.futures import FIRST_COMPLETED

async def print_when_done(pending):
    while True:
        if not pending:
            break

        done, pending = await asyncio.wait(pending, return_when=FIRST_COMPLETED)

        for res in done:
            print("Result worker %s" % res)

But then why do you need Asyncio, you can create threads without it and work with them.

当然,线程可以更高效,你可以用它们做更多的事情,但单线程异步协作多任务更容易协调。

it simply calls the function without blocking the main process, and at the end it returns the responses of all functions

它同时启动所有工作人员,这应该是这样,立即计算他们的结果(因为工作人员实际上不包含任何 I/O 阻塞)和 return 同时计算结果。

如果你想在不同的时间看到工人 return 结果,你应该让他们在不同的时间执行 - 例如,通过放置 await asyncio.sleep(randint(1, 3)) 而不是你的 0-sleep。


我不确定我是否理解你为什么想要这个:

worker # 2
worker # 3
worker # 4
Result worker 3

因为你在每个工人的顶部都有 print(之前没有一些 I/O 阻塞操作)并且 运行 所有工人同时 - 你会立即看到他们的所有印刷品,在任何结果之前。

我的随机猜测是您可能想限制并行工作人员的数量 运行?在这种情况下,您可以使用像 asyncio.Semaphore.

这样的同步原语

这里的例子包含以上所有内容:

import asyncio
from random import randint

sem = asyncio.Semaphore(3)  # don't allow more then 3 workers parallely


async def worker(number):
    async with sem:
        print("started # %d" % number)
        await asyncio.sleep(randint(1, 3))
        return str(number)


async def main():
    coros = [worker(i) for i in range(10)]

    for res in asyncio.as_completed(coros):
        print("finished %s" % await res)


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

输出:

started # 0
started # 6
started # 7
started # 2
finished 7
started # 8
finished 0
started # 3
finished 6
started # 9
finished 2
started # 4
finished 8
started # 1
started # 5
finished 3
finished 9
finished 4
finished 1
finished 5