我能否获得在 wait_for() 中被 TimeOut 中断的 asyncio 'shielded' 任务的结果
Can I get result of the asyncio 'shielded' task that was interrupted in wait_for() by TimeOut
请帮助我理解一些 asyncio 的东西。
我想知道下一步是否可行:
我有同步功能,例如在远程 API 中创建一些数据(API 可以 returns 成功或失败):
def sync_func(url):
... do something
return result
我有 运行 执行程序中同步操作的协程:
async def coro_func(url)
loop = asyncio.get_event_loop()
fn = functools.partial(sync_func, url)
return await loop.run_in_executor(None, fn)
接下来我想做的事
- 如果远程 API 没有响应 1 秒,我想开始下一个 url 处理,但我想知道第一个任务的结果(当 API最终将发送响应)被超时中断。我将 coro_func() 包裹在一个 shield() 中以避免它被取消。但是不知道如何在...之后检查结果
list_of_urls = [url1, ... urlN]
map_of_task_results = {}
async def task_processing():
for url in list_of_urls:
res = asyncio.wait_for(shield(coro_func(url), timeout=1))
if res == 'success':
return res
break
else:
map_of_task_results[url] = res
return "all tasks were processed"
P.S。当我尝试访问 shield(coro) 结果时 - 它有 CancelledError 异常..但我希望可能有结果,因为我 'shielded' task.
try:
task = asyncio.shield(coro_func(url))
result = await asyncio.wait_for(task, timeout=API_TIMEOUT)
except TimeoutError as e:
import ipdb; ipdb.set_trace()
pending_tasks[api_details['api_url']] = task
ipdb> task
<Future cancelled created at
/usr/lib/python3.6/asyncio/base_events.py:276>
ipdb> task.exception
<built-in method exception of _asyncio.Future object at 0x7f7d41eeb588>
ipdb> task.exception()
*** concurrent.futures._base.CancelledError
`
如果您在屏蔽协程之前从协程中创建了一个未来(任务),您以后可以随时检查它。例如:
coro_task = loop.create_task(coro_func(url))
try:
result = await asyncio.wait_for(asyncio.shield(coro_task), API_TIMEOUT)
except asyncio.TimeoutError:
pending_tasks[api_details['api_url']] = coro_task
如果是,您可以使用 coro_task.done()
to check if the task has completed in the meantime and call result()
,如果不是,您可以使用 await
。如果需要,您甚至可以再次使用 shield
/wait_for
,依此类推。
好的,谢谢@user4815162342 我想出了如何处理那些被超时中断的任务 - 通常我的解决方案现在看起来像:
def sync_func(url):
... do something probably long
return result
async def coro_func(url)
loop = asyncio.get_event_loop()
fn = functools.partial(sync_func, url)
return await loop.run_in_executor(None, fn)
async def waiter(pending_tasks):
count = 60
while not all(map(lambda x: x.done(), pending_tasks.values())) and count > 0:
logger.info("Waiting for pending tasks..")
await asyncio.sleep(1)
count -= 1
# Finally process results those were in pending
print([task.result() for task in pending_tasks.values()])
async def task_processing(...):
list_of_urls = [url1, ... urlN]
pending_tasks = {}
for url in list_of_urls:
try:
task = asyncio.Task(coro_func(url))
result = await asyncio.wait_for(asyncio.shield(task), timeout=API_TIMEOUT)
except TimeoutError as e:
pending_tasks[url] = task
if not result or result != 'success':
continue
else:
print('Do something good here on first fast success, response to user ASAP in my case.')
break
# here start of pending task processing
loop = asyncio.get_event_loop()
loop.create_task(waiter(pending_tasks))
所以我正在收集那些被字典映射对象中的 concurrent.future.TimeoutError 中断的任务,然后我 运行 带有 waiter() coro 的任务试图等待 60 秒,而挂起的任务将得到状态完成或 60 秒将 运行 出来。
另外的话,我的代码放到了Tornado的RequestHandler中,Tornado使用了asyncio event loop。
因此,在 N 次尝试从 url 列表中的一个 url 获得快速响应后,我可以回答用户并且不会丢失那些因 TimeoutError 启动和中断的任务的结果。 (我可以在回复用户后处理它们,所以这是我的主要想法)
我希望它能为寻找相同内容的人节省很多时间:)
请帮助我理解一些 asyncio 的东西。 我想知道下一步是否可行:
我有同步功能,例如在远程 API 中创建一些数据(API 可以 returns 成功或失败):
def sync_func(url):
... do something
return result
我有 运行 执行程序中同步操作的协程:
async def coro_func(url)
loop = asyncio.get_event_loop()
fn = functools.partial(sync_func, url)
return await loop.run_in_executor(None, fn)
接下来我想做的事
- 如果远程 API 没有响应 1 秒,我想开始下一个 url 处理,但我想知道第一个任务的结果(当 API最终将发送响应)被超时中断。我将 coro_func() 包裹在一个 shield() 中以避免它被取消。但是不知道如何在...之后检查结果
list_of_urls = [url1, ... urlN]
map_of_task_results = {}
async def task_processing():
for url in list_of_urls:
res = asyncio.wait_for(shield(coro_func(url), timeout=1))
if res == 'success':
return res
break
else:
map_of_task_results[url] = res
return "all tasks were processed"
P.S。当我尝试访问 shield(coro) 结果时 - 它有 CancelledError 异常..但我希望可能有结果,因为我 'shielded' task.
try:
task = asyncio.shield(coro_func(url))
result = await asyncio.wait_for(task, timeout=API_TIMEOUT)
except TimeoutError as e:
import ipdb; ipdb.set_trace()
pending_tasks[api_details['api_url']] = task
ipdb> task
<Future cancelled created at
/usr/lib/python3.6/asyncio/base_events.py:276>
ipdb> task.exception
<built-in method exception of _asyncio.Future object at 0x7f7d41eeb588>
ipdb> task.exception()
*** concurrent.futures._base.CancelledError
`
如果您在屏蔽协程之前从协程中创建了一个未来(任务),您以后可以随时检查它。例如:
coro_task = loop.create_task(coro_func(url))
try:
result = await asyncio.wait_for(asyncio.shield(coro_task), API_TIMEOUT)
except asyncio.TimeoutError:
pending_tasks[api_details['api_url']] = coro_task
如果是,您可以使用 coro_task.done()
to check if the task has completed in the meantime and call result()
,如果不是,您可以使用 await
。如果需要,您甚至可以再次使用 shield
/wait_for
,依此类推。
好的,谢谢@user4815162342 我想出了如何处理那些被超时中断的任务 - 通常我的解决方案现在看起来像:
def sync_func(url):
... do something probably long
return result
async def coro_func(url)
loop = asyncio.get_event_loop()
fn = functools.partial(sync_func, url)
return await loop.run_in_executor(None, fn)
async def waiter(pending_tasks):
count = 60
while not all(map(lambda x: x.done(), pending_tasks.values())) and count > 0:
logger.info("Waiting for pending tasks..")
await asyncio.sleep(1)
count -= 1
# Finally process results those were in pending
print([task.result() for task in pending_tasks.values()])
async def task_processing(...):
list_of_urls = [url1, ... urlN]
pending_tasks = {}
for url in list_of_urls:
try:
task = asyncio.Task(coro_func(url))
result = await asyncio.wait_for(asyncio.shield(task), timeout=API_TIMEOUT)
except TimeoutError as e:
pending_tasks[url] = task
if not result or result != 'success':
continue
else:
print('Do something good here on first fast success, response to user ASAP in my case.')
break
# here start of pending task processing
loop = asyncio.get_event_loop()
loop.create_task(waiter(pending_tasks))
所以我正在收集那些被字典映射对象中的 concurrent.future.TimeoutError 中断的任务,然后我 运行 带有 waiter() coro 的任务试图等待 60 秒,而挂起的任务将得到状态完成或 60 秒将 运行 出来。
另外的话,我的代码放到了Tornado的RequestHandler中,Tornado使用了asyncio event loop。 因此,在 N 次尝试从 url 列表中的一个 url 获得快速响应后,我可以回答用户并且不会丢失那些因 TimeoutError 启动和中断的任务的结果。 (我可以在回复用户后处理它们,所以这是我的主要想法)
我希望它能为寻找相同内容的人节省很多时间:)