不能在期货清单上 运行 asyncio.wait(...)

Can't run asyncio.wait(...) on a list of futures

我正在将协程提交到单独线程中的事件循环。当我按 future.next() 顺序等待每个未来时,这一切都很好。但我现在想等待期货清单中第一个完成的未来。我正在尝试为此使用 asyncio.wait(...),但我似乎使用不正确。

下面是一个简化的例子。我在 done, pending = future.result() 行收到异常 TypeError: An asyncio.Future, a coroutine or an awaitable is required

如果我将 [c1, c2, c3] 传递给 asyncio.wait([c1, c2, c3], return_when=asyncio.FIRST_COMPLETE),这会起作用,但我是随机提交任务,所以我只能收集未来的集合,而不是原始任务。并且文档明确说明可以使用 futures。

coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).

import asyncio
import threading


async def generate():
    await asyncio.sleep(10)
    return 'Hello'


def run_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


event_loop = asyncio.get_event_loop()
threading.Thread(target=lambda: run_loop(event_loop)).start()

c1 = generate()  # submitted at a random time
c2 = generate()  # submitted at a random time
c3 = generate()  # submitted at a random time
f1 = asyncio.run_coroutine_threadsafe(c1, event_loop)
f2 = asyncio.run_coroutine_threadsafe(c2, event_loop)
f3 = asyncio.run_coroutine_threadsafe(c3, event_loop)

all_futures = [f1, f2, f3]

# I'm doing something wrong in these 3 lines
waitable = asyncio.wait(all_futures, return_when=asyncio.FIRST_COMPLETED)
future = asyncio.run_coroutine_threadsafe(waitable, event_loop)
done, pending = future.result()  # This returns my TypeError exception

for d in done:
    print(d.result())

这个答案有助于回答这个问题:

asyncio.wait(...) 不能接受期货,只能接受尚未安排的协程和等待对象。正确的方法是使用回调。当未来完成时,它可以将自己添加到线程安全队列中,您可以从该队列中拉出。下面的例子解决了问题中的问题:

import asyncio
import threading
import queue
import random


async def generate(i):
    await asyncio.sleep(random.randint(5, 10))
    return 'Hello {}'.format(i)


def run_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


def done(fut):
    q.put(fut)


event_loop = asyncio.get_event_loop()
threading.Thread(target=lambda: run_loop(event_loop)).start()
q = queue.Queue()

c1 = generate(1)
c2 = generate(2)
c3 = generate(3)
f1 = asyncio.run_coroutine_threadsafe(c1, event_loop)
f2 = asyncio.run_coroutine_threadsafe(c2, event_loop)
f3 = asyncio.run_coroutine_threadsafe(c3, event_loop)
f1.add_done_callback(lambda fut: q.put(fut))
f2.add_done_callback(lambda fut: q.put(fut))
f3.add_done_callback(lambda fut: q.put(fut))

print(q.get().result())
print(q.get().result())
print(q.get().result())

asyncio.wait 期望 asyncio 期货并在事件循环内工作。要等待多个 concurrent.futures 期货(在事件循环之外),请改用 concurrent.futures.wait

done, pending = concurrent.futures.wait(
    all_futures, return_when=concurrent.futures.FIRST_COMPLETED)

请注意,如果您可以访问底层的 asyncio 期货,您的想法就会奏效。例如(未经测试):

async def submit(coro):
    # submit the coroutine and return the asyncio task (future)
    return asyncio.create_task(coro)

# ...generate() as before

# note that we use result() to get to the asyncio futures:
f1 = asyncio.run_coroutine_threadsafe(submit(c1), event_loop).result()
f2 = asyncio.run_coroutine_threadsafe(submit(c2), event_loop).result()
f3 = asyncio.run_coroutine_threadsafe(submit(c3), event_loop).result()

# these should be waitable by submitting wait() to the event loop
done, pending = asyncio.run_coroutine_threadsafe(
    asyncio.wait([f1, f2, f3], return_when=asyncio.FIRST_COMPLETED)).result()