如何正确使用asyncio.FIRST_COMPLETED
How to properly use asyncio.FIRST_COMPLETED
问题是,即使我在 await asyncio.wait()
中使用 return_when=asyncio.FIRST_COMPLETED
,我仍然会收到 RuntimeError: Event loop is closed
错误。
我的代码:
async def task_manager():
tasks = [grab_proxy() for _ in range(10)]
finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for x in finished:
result = x.result()
if result:
return result
def get_proxy_loop():
loop = asyncio.new_event_loop()
proxy = loop.run_until_complete(task_manager())
loop.close()
return proxy
if __name__ == '__main__':
p = get_proxy_loop()
print(type(p))
print(p)
预期行为:
return_when=asyncio.FIRST_COMPLETED
应该在返回第一个结果时终止所有剩余任务 "under the hood"。
但实际上在第一个结果返回后还有未完成的任务。在我关闭 get_proxy_loop()
中的循环并在 __main__
中访问结果之后,那些剩余的任务会引发 RuntimeError: Event loop is closed
.
控制台输出:
<class 'str'>
78.32.35.21:55075
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() running at /home/pata/PycharmProjects/accs_farm/accs_farm/proxy_grabber.py:187> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc5187a8798>()]>>
Exception ignored in: <coroutine object grab_proxy at 0x7fc5150aae60>
Traceback (most recent call last):
File "/home/pata/proxy_grabber.py", line 187, in grab_proxy
proxy = await async_get_proxy()
File "/home/pata/proxy_grabber.py", line 138, in async_get_proxy
async with session.get(provider_url, timeout=5, params=params) as r:
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client.py", line 855, in __aenter__
self._resp = await self._coro
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client.py", line 396, in _request
conn.close()
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/connector.py", line 110, in close
self._key, self._protocol, should_close=True)
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/connector.py", line 547, in _release
Event loop is closed
transport = protocol.close()
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client_proto.py", line 54, in close
transport.close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 621, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/usr/lib/python3.6/asyncio/base_events.py", line 580, in call_soon
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 366, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
...
...
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() done, defined at /home/pata/proxy_grabber.py:183> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(11)(), <TaskWakeupMethWrapper object at 0x7fc514d15e28>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() done, defined at /proxy_grabber.py:183> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc5187a8588>()]>>
Event loop is closed
Process finished with exit code 0
asyncio.wait(..., return_when=asyncio.FIRST_COMPLETED)
协程returns当至少一个任务完成时。其他任务仍可处于活动状态。为您取消这些任务不是 asyncio.wait()
的工作。 asyncio.wait(..., return_when=asyncio.FIRST_COMPLETED)
的用例是让您监控任务并在任务完成时根据其结果采取行动;您通常会重复调用它,直到完成所有任务。
来自asyncio.wait()
documentation:
Run awaitable objects in the aws set concurrently and block until the condition specified by return_when.
[...]
return_when indicates when this function should return. It must be one of the following constants:
FIRST_COMPLETED
The function will return when any future finishes or is cancelled.
[...]
Unlike wait_for()
, wait()
does not cancel the futures when a timeout occurs.
文档明确指出它不会取消期货,即使您设置了超时(如果您确实设置了超时,那么第一个 done 设置只是空的,任务都仍处于活动状态并列在第二个 pending 集合中)。
如果您需要取消未完成的任务,请明确执行:
while tasks:
finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for x in finished:
result = x.result()
if result:
# cancel the other tasks, we have a result. We need to wait for the cancellations
# to propagate.
for task in unfinished:
task.cancel()
await asyncio.wait(unfinished)
return result
tasks = unfinished
带有一些额外打印和随机任务的演示:
>>> import asyncio
>>> import random
>>> async def grab_proxy(taskid):
... await asyncio.sleep(random.uniform(0.1, 1))
... result = random.choice([None, None, None, 'result'])
... print(f'Task #{taskid} producing result {result!r}')
... return result
...
>>> async def task_manager():
... tasks = [grab_proxy(i) for i in range(10)]
... while tasks:
... finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
... for x in finished:
... result = x.result()
... print(f"Finished task produced {result!r}")
... if result:
... # cancel the other tasks, we have a result. We need to wait for the cancellations
... # to propagate.
... print(f"Cancelling {len(unfinished)} remaining tasks")
... for task in unfinished:
... task.cancel()
... await asyncio.wait(unfinished)
... return result
... tasks = unfinished
...
>>>
>>> def get_proxy_loop():
... loop = asyncio.new_event_loop()
... proxy = loop.run_until_complete(task_manager())
... loop.close()
... return proxy
...
>>> get_proxy_loop()
Task #7 producing result None
Finished task produced None
Task #0 producing result 'result'
Finished task produced 'result'
Cancelling 8 remaining tasks
'result'
问题是,即使我在 await asyncio.wait()
中使用 return_when=asyncio.FIRST_COMPLETED
,我仍然会收到 RuntimeError: Event loop is closed
错误。
我的代码:
async def task_manager():
tasks = [grab_proxy() for _ in range(10)]
finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for x in finished:
result = x.result()
if result:
return result
def get_proxy_loop():
loop = asyncio.new_event_loop()
proxy = loop.run_until_complete(task_manager())
loop.close()
return proxy
if __name__ == '__main__':
p = get_proxy_loop()
print(type(p))
print(p)
预期行为:
return_when=asyncio.FIRST_COMPLETED
应该在返回第一个结果时终止所有剩余任务 "under the hood"。
但实际上在第一个结果返回后还有未完成的任务。在我关闭 get_proxy_loop()
中的循环并在 __main__
中访问结果之后,那些剩余的任务会引发 RuntimeError: Event loop is closed
.
控制台输出:
<class 'str'>
78.32.35.21:55075
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() running at /home/pata/PycharmProjects/accs_farm/accs_farm/proxy_grabber.py:187> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc5187a8798>()]>>
Exception ignored in: <coroutine object grab_proxy at 0x7fc5150aae60>
Traceback (most recent call last):
File "/home/pata/proxy_grabber.py", line 187, in grab_proxy
proxy = await async_get_proxy()
File "/home/pata/proxy_grabber.py", line 138, in async_get_proxy
async with session.get(provider_url, timeout=5, params=params) as r:
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client.py", line 855, in __aenter__
self._resp = await self._coro
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client.py", line 396, in _request
conn.close()
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/connector.py", line 110, in close
self._key, self._protocol, should_close=True)
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/connector.py", line 547, in _release
Event loop is closed
transport = protocol.close()
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client_proto.py", line 54, in close
transport.close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 621, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/usr/lib/python3.6/asyncio/base_events.py", line 580, in call_soon
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 366, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
...
...
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() done, defined at /home/pata/proxy_grabber.py:183> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(11)(), <TaskWakeupMethWrapper object at 0x7fc514d15e28>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() done, defined at /proxy_grabber.py:183> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc5187a8588>()]>>
Event loop is closed
Process finished with exit code 0
asyncio.wait(..., return_when=asyncio.FIRST_COMPLETED)
协程returns当至少一个任务完成时。其他任务仍可处于活动状态。为您取消这些任务不是 asyncio.wait()
的工作。 asyncio.wait(..., return_when=asyncio.FIRST_COMPLETED)
的用例是让您监控任务并在任务完成时根据其结果采取行动;您通常会重复调用它,直到完成所有任务。
来自asyncio.wait()
documentation:
Run awaitable objects in the aws set concurrently and block until the condition specified by return_when.
[...]
return_when indicates when this function should return. It must be one of the following constants:
FIRST_COMPLETED
The function will return when any future finishes or is cancelled.[...]
Unlike
wait_for()
,wait()
does not cancel the futures when a timeout occurs.
文档明确指出它不会取消期货,即使您设置了超时(如果您确实设置了超时,那么第一个 done 设置只是空的,任务都仍处于活动状态并列在第二个 pending 集合中)。
如果您需要取消未完成的任务,请明确执行:
while tasks:
finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for x in finished:
result = x.result()
if result:
# cancel the other tasks, we have a result. We need to wait for the cancellations
# to propagate.
for task in unfinished:
task.cancel()
await asyncio.wait(unfinished)
return result
tasks = unfinished
带有一些额外打印和随机任务的演示:
>>> import asyncio
>>> import random
>>> async def grab_proxy(taskid):
... await asyncio.sleep(random.uniform(0.1, 1))
... result = random.choice([None, None, None, 'result'])
... print(f'Task #{taskid} producing result {result!r}')
... return result
...
>>> async def task_manager():
... tasks = [grab_proxy(i) for i in range(10)]
... while tasks:
... finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
... for x in finished:
... result = x.result()
... print(f"Finished task produced {result!r}")
... if result:
... # cancel the other tasks, we have a result. We need to wait for the cancellations
... # to propagate.
... print(f"Cancelling {len(unfinished)} remaining tasks")
... for task in unfinished:
... task.cancel()
... await asyncio.wait(unfinished)
... return result
... tasks = unfinished
...
>>>
>>> def get_proxy_loop():
... loop = asyncio.new_event_loop()
... proxy = loop.run_until_complete(task_manager())
... loop.close()
... return proxy
...
>>> get_proxy_loop()
Task #7 producing result None
Finished task produced None
Task #0 producing result 'result'
Finished task produced 'result'
Cancelling 8 remaining tasks
'result'