Python: 按完成排序任务后获取对原始任务的引用
Python: Get reference to original task after ordering tasks by completion
问题:asyncio.as_completed出结果后,如何获取原始任务的引用?
与这个 C# 问题基本相同,除了 Python:
示例问题:
# Takes a list of WebClient objects,
# calls each one simultaneously,
# and yields the results immediately as they arrive
# to a synchronous caller.
def yieldThingsAsTheyArrive(webClients):
tasks = []
for webClient in webClients:
# This is what we want to get a reference to later:
task = webClient.fetch_thing() # start long-running request asynchronously
tasks.append(task)
loop = asyncio.get_event_loop()
for future in asyncio.as_completed(tasks):
thing = loop.run_until_complete(future) # since our caller is synchronous, wait until the task completes so we can yield the final result instead of a future
thing.originalWebClient = ??? # This is where we need a reference to the original webClient
yield thing
as_completed
的特殊之处在于它既不会像 asyncio.wait
, nor their results like asyncio.gather
那样产生期货。相反,它会生成您需要等待的协程(以您喜欢的任何方式)以按完成顺序获得结果。它无法产生您传递给它的期货,因为此时它还不知道接下来将完成哪个传递的期货。
您可以通过在另一个未来包装任务来关联任意数据,其结果是任务对象(您已将数据附加到该对象)。这本质上等同于 does, only without the static-typing ceremony. Taking the setup from ,可运行的示例如下所示:
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
def ordinary_generator():
loop = asyncio.get_event_loop()
wrappers = []
for idx, coro in enumerate((first(), second(), third())):
task = loop.create_task(coro)
task.idx = idx + 1
# Wrap the task in a future that completes when the
# task does, but whose result is the task object itself.
wrapper = loop.create_future()
task.add_done_callback(wrapper.set_result)
wrappers.append(wrapper)
for x in asyncio.as_completed(wrappers):
# yield completed tasks
yield loop.run_until_complete(x)
for task in ordinary_generator():
print(task.result(), task.idx)
我推荐的另一个选项是用调用 asyncio.wait(return_when=FIRST_COMPLETED)
的循环替换 as_completed
上的迭代。这也将提供完整的期货,但不需要额外的包装,并导致稍微更惯用的异步代码。我们在每个协程上调用 ensure_future
将其转换为 future,向其附加数据,然后才将其传递给 asyncio.wait()
。由于 wait
returns 那些相同的期货,附加数据在它们上面。
def ordinary_generator():
loop = asyncio.get_event_loop()
pending = []
for idx, coro in enumerate((first(), second(), third())):
task = loop.create_task(coro)
task.idx = idx + 1
pending.append(task)
while pending:
done, pending = loop.run_until_complete(asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED))
for task in done:
yield task
问题:asyncio.as_completed出结果后,如何获取原始任务的引用?
与这个 C# 问题基本相同,除了 Python:
示例问题:
# Takes a list of WebClient objects,
# calls each one simultaneously,
# and yields the results immediately as they arrive
# to a synchronous caller.
def yieldThingsAsTheyArrive(webClients):
tasks = []
for webClient in webClients:
# This is what we want to get a reference to later:
task = webClient.fetch_thing() # start long-running request asynchronously
tasks.append(task)
loop = asyncio.get_event_loop()
for future in asyncio.as_completed(tasks):
thing = loop.run_until_complete(future) # since our caller is synchronous, wait until the task completes so we can yield the final result instead of a future
thing.originalWebClient = ??? # This is where we need a reference to the original webClient
yield thing
as_completed
的特殊之处在于它既不会像 asyncio.wait
, nor their results like asyncio.gather
那样产生期货。相反,它会生成您需要等待的协程(以您喜欢的任何方式)以按完成顺序获得结果。它无法产生您传递给它的期货,因为此时它还不知道接下来将完成哪个传递的期货。
您可以通过在另一个未来包装任务来关联任意数据,其结果是任务对象(您已将数据附加到该对象)。这本质上等同于
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
def ordinary_generator():
loop = asyncio.get_event_loop()
wrappers = []
for idx, coro in enumerate((first(), second(), third())):
task = loop.create_task(coro)
task.idx = idx + 1
# Wrap the task in a future that completes when the
# task does, but whose result is the task object itself.
wrapper = loop.create_future()
task.add_done_callback(wrapper.set_result)
wrappers.append(wrapper)
for x in asyncio.as_completed(wrappers):
# yield completed tasks
yield loop.run_until_complete(x)
for task in ordinary_generator():
print(task.result(), task.idx)
我推荐的另一个选项是用调用 asyncio.wait(return_when=FIRST_COMPLETED)
的循环替换 as_completed
上的迭代。这也将提供完整的期货,但不需要额外的包装,并导致稍微更惯用的异步代码。我们在每个协程上调用 ensure_future
将其转换为 future,向其附加数据,然后才将其传递给 asyncio.wait()
。由于 wait
returns 那些相同的期货,附加数据在它们上面。
def ordinary_generator():
loop = asyncio.get_event_loop()
pending = []
for idx, coro in enumerate((first(), second(), third())):
task = loop.create_task(coro)
task.idx = idx + 1
pending.append(task)
while pending:
done, pending = loop.run_until_complete(asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED))
for task in done:
yield task