我能否获得在 wait_for() 中被 TimeOut 中断的 asyncio 'shielded' 任务的结果

Can I get result of the asyncio 'shielded' task that was interrupted in wait_for() by TimeOut

请帮助我理解一些 asyncio 的东西。 我想知道下一步是否可行:

我有同步功能,例如在远程 API 中创建一些数据(API 可以 returns 成功或失败):

def sync_func(url):
   ... do something
   return result

我有 运行 执行程序中同步操作的协程:

async def coro_func(url)
    loop = asyncio.get_event_loop()
    fn = functools.partial(sync_func, url)
    return await loop.run_in_executor(None, fn)

接下来我想做的事

  1. 如果远程 API 没有响应 1 秒,我想开始下一个 url 处理,但我想知道第一个任务的结果(当 API最终将发送响应)被超时中断。我将 coro_func() 包裹在一个 shield() 中以避免它被取消。但是不知道如何在...之后检查结果

list_of_urls = [url1, ... urlN] map_of_task_results = {} async def task_processing(): for url in list_of_urls: res = asyncio.wait_for(shield(coro_func(url), timeout=1)) if res == 'success': return res break else: map_of_task_results[url] = res return "all tasks were processed"

P.S。当我尝试访问 shield(coro) 结果时 - 它有 CancelledError 异常..但我希望可能有结果,因为我 'shielded' task.

try: task = asyncio.shield(coro_func(url)) result = await asyncio.wait_for(task, timeout=API_TIMEOUT) except TimeoutError as e: import ipdb; ipdb.set_trace() pending_tasks[api_details['api_url']] = task

ipdb> task
<Future cancelled created at 
/usr/lib/python3.6/asyncio/base_events.py:276>
ipdb> task.exception
<built-in method exception of _asyncio.Future object at 0x7f7d41eeb588>
ipdb> task.exception()

*** concurrent.futures._base.CancelledError

`

如果您在屏蔽协程之前从协程中创建了一个未来(任务),您以后可以随时检查它。例如:

coro_task = loop.create_task(coro_func(url))
try:
    result = await asyncio.wait_for(asyncio.shield(coro_task), API_TIMEOUT)
except asyncio.TimeoutError:
    pending_tasks[api_details['api_url']] = coro_task

如果是,您可以使用 coro_task.done() to check if the task has completed in the meantime and call result(),如果不是,您可以使用 await。如果需要,您甚至可以再次使用 shield/wait_for,依此类推。

好的,谢谢@user4815162342 我想出了如何处理那些被超时中断的任务 - 通常我的解决方案现在看起来像:

def sync_func(url):
   ... do something probably long
   return result

async def coro_func(url)
    loop = asyncio.get_event_loop()
    fn = functools.partial(sync_func, url)
    return await loop.run_in_executor(None, fn)

async def waiter(pending_tasks):
    count = 60
    while not all(map(lambda x: x.done(), pending_tasks.values())) and count > 0:
        logger.info("Waiting for pending tasks..")
        await asyncio.sleep(1)
        count -= 1

    # Finally process results those were in pending 
    print([task.result() for task in pending_tasks.values()])

async def task_processing(...):
    list_of_urls = [url1, ... urlN]
    pending_tasks = {}

    for url in list_of_urls:
        try:
            task = asyncio.Task(coro_func(url))
            result = await asyncio.wait_for(asyncio.shield(task), timeout=API_TIMEOUT)
        except TimeoutError as e:
            pending_tasks[url] = task

        if not result or result != 'success':
            continue
        else:
            print('Do something good here on first fast success, response to user ASAP in my case.')
            break

    # here start of pending task processing
    loop = asyncio.get_event_loop()
    loop.create_task(waiter(pending_tasks))

所以我正在收集那些被字典映射对象中的 concurrent.future.TimeoutError 中断的任务,然后我 运行 带有 waiter() coro 的任务试图等待 60 秒,而挂起的任务将得到状态完成或 60 秒将 运行 出来。

另外的话,我的代码放到了Tornado的RequestHandler中,Tornado使用了asyncio event loop。 因此,在 N 次尝试从 url 列表中的一个 url 获得快速响应后,我可以回答用户并且不会丢失那些因 TimeoutError 启动和中断的任务的结果。 (我可以在回复用户后处理它们,所以这是我的主要想法)

我希望它能为寻找相同内容的人节省很多时间:)