Python Asyncio 任务 运行 没有 gather()
Python Asyncio task is running without gather()
我试图重现并更好地理解 Cristian Garcia 的 this 博客 post 中的任务池示例,我 运行 得到了一个非常有趣的结果。
这是我使用的两个脚本。我用 运行dom 睡眠调用
交换了实际的网络请求
#task_pool.py
import asyncio
class TaskPool(object):
def __init__(self, workers):
self._semaphore = asyncio.Semaphore(workers)
self._tasks = set()
async def put(self, coro):
await self._semaphore.acquire()
task = asyncio.create_task(coro)
self._tasks.add(task)
task.add_done_callback(self._on_task_done)
def _on_task_done(self, task):
self._tasks.remove(task)
self._semaphore.release()
async def join(self):
await asyncio.gather(*self._tasks)
async def __aenter__(self):
return self
def __aexit__(self, exc_type, exc, tb):
print("aexit triggered")
return self.join()
和
# main.py
import asyncio
import sys
from task_pool import TaskPool
import random
limit = 3
async def fetch(i):
timereq = random.randrange(5)
print("request: {} start, delay: {}".format(i, timereq))
await asyncio.sleep(timereq)
print("request: {} end".format(i))
return (timereq,i)
async def _main(total_requests):
async with TaskPool(limit) as tasks:
for i in range(total_requests):
await tasks.put(fetch(i))
loop = asyncio.get_event_loop()
loop.run_until_complete(_main(int(sys.argv[1])))
python 3.7.1 上的命令 main.py 10
产生以下结果。
request: 0 start, delay: 3
request: 1 start, delay: 3
request: 2 start, delay: 3
request: 0 end
request: 1 end
request: 2 end
request: 3 start, delay: 4
request: 4 start, delay: 1
request: 5 start, delay: 0
request: 5 end
request: 6 start, delay: 1
request: 4 end
request: 6 end
request: 7 start, delay: 1
request: 8 start, delay: 4
request: 7 end
aexit triggered
request: 9 start, delay: 1
request: 9 end
request: 3 end
request: 8 end
根据这个结果我有几个问题。
- 在上下文管理器退出并触发
__aexit__
之前,我不会期望任务 运行,因为这是 asyncio.gather
的唯一触发器。然而,打印语句强烈表明 fetch
作业甚至在 aexit
之前就已经发生了。到底发生了什么事?任务是 运行ning 吗?如果是这样,是什么开始了他们?
- 与 (1) 有关。为什么上下文管理器在所有作业完成 return 之前退出?
fetch
作业应该 return 一个元组。我怎样才能访问这个值?对于基于 Web 的应用程序,我想开发人员可能希望对网站编辑的数据 return 进行操作。
非常感谢任何帮助!
调用 create_task
后任务立即启动。
直接来自文档,第一行:
Wrap the coro coroutine into a Task and schedule its execution.
不应该,但是。查看您问题中的代码:
def __aexit__(self, exc_type, exc, tb):
print("aexit triggered")
return self.join()
存在三个问题:
这是一个常规的同步函数。将其更改为 async def
并添加必需的 await
以调用 self.join()
。在这里你不调用 join
你只是创建任务但从来没有 运行 它。您的 python 肯定会抱怨您从不等待任务。 绝不能忽略这些警告,因为它们意味着您的程序中出现了严重错误。
[edit:] 正如 user4815162342 在下面指出的那样,您编写的结构实际上可以工作,尽管可能不是出于预期的原因——它工作是因为协程函数 return 通过调用 self.join()
而无需等待它将被 returned 并像它是 aexit 自己的一样使用。你不想要这个,让它异步并等待。
一旦修复,__aexit__
将打印 "aexit triggered" 并且 然后 调用 join
,等待任务去完成。因此,来自尚未完成的任务的消息将出现在 "aexit triggered" 消息之后。
__aexit__
的 return 值被忽略,除非退出是因为引发了异常。在这种情况下,return True
将吞下异常。删除 return
所以那部分,固定的:
async def __aexit__(self, exc_type, exc, tb):
print("aexit triggered")
await self.join()
print("aexit completed")
您的 TaskPool
必须使任务结果可用。这是你的设计,python 不会在引擎盖下做任何魔术。根据您所拥有的,一个简单的方法是 join
将 gather
的结果存储为任务池的一个属性。
我试图重现并更好地理解 Cristian Garcia 的 this 博客 post 中的任务池示例,我 运行 得到了一个非常有趣的结果。
这是我使用的两个脚本。我用 运行dom 睡眠调用
交换了实际的网络请求#task_pool.py
import asyncio
class TaskPool(object):
def __init__(self, workers):
self._semaphore = asyncio.Semaphore(workers)
self._tasks = set()
async def put(self, coro):
await self._semaphore.acquire()
task = asyncio.create_task(coro)
self._tasks.add(task)
task.add_done_callback(self._on_task_done)
def _on_task_done(self, task):
self._tasks.remove(task)
self._semaphore.release()
async def join(self):
await asyncio.gather(*self._tasks)
async def __aenter__(self):
return self
def __aexit__(self, exc_type, exc, tb):
print("aexit triggered")
return self.join()
和
# main.py
import asyncio
import sys
from task_pool import TaskPool
import random
limit = 3
async def fetch(i):
timereq = random.randrange(5)
print("request: {} start, delay: {}".format(i, timereq))
await asyncio.sleep(timereq)
print("request: {} end".format(i))
return (timereq,i)
async def _main(total_requests):
async with TaskPool(limit) as tasks:
for i in range(total_requests):
await tasks.put(fetch(i))
loop = asyncio.get_event_loop()
loop.run_until_complete(_main(int(sys.argv[1])))
python 3.7.1 上的命令 main.py 10
产生以下结果。
request: 0 start, delay: 3
request: 1 start, delay: 3
request: 2 start, delay: 3
request: 0 end
request: 1 end
request: 2 end
request: 3 start, delay: 4
request: 4 start, delay: 1
request: 5 start, delay: 0
request: 5 end
request: 6 start, delay: 1
request: 4 end
request: 6 end
request: 7 start, delay: 1
request: 8 start, delay: 4
request: 7 end
aexit triggered
request: 9 start, delay: 1
request: 9 end
request: 3 end
request: 8 end
根据这个结果我有几个问题。
- 在上下文管理器退出并触发
__aexit__
之前,我不会期望任务 运行,因为这是asyncio.gather
的唯一触发器。然而,打印语句强烈表明fetch
作业甚至在aexit
之前就已经发生了。到底发生了什么事?任务是 运行ning 吗?如果是这样,是什么开始了他们? - 与 (1) 有关。为什么上下文管理器在所有作业完成 return 之前退出?
fetch
作业应该 return 一个元组。我怎样才能访问这个值?对于基于 Web 的应用程序,我想开发人员可能希望对网站编辑的数据 return 进行操作。
非常感谢任何帮助!
调用
create_task
后任务立即启动。直接来自文档,第一行:
Wrap the coro coroutine into a Task and schedule its execution.
不应该,但是。查看您问题中的代码:
def __aexit__(self, exc_type, exc, tb): print("aexit triggered") return self.join()
存在三个问题:
这是一个常规的同步函数。将其更改为
async def
并添加必需的await
以调用self.join()
。在这里你不调用join
你只是创建任务但从来没有 运行 它。您的 python 肯定会抱怨您从不等待任务。 绝不能忽略这些警告,因为它们意味着您的程序中出现了严重错误。[edit:] 正如 user4815162342 在下面指出的那样,您编写的结构实际上可以工作,尽管可能不是出于预期的原因——它工作是因为协程函数 return 通过调用
self.join()
而无需等待它将被 returned 并像它是 aexit 自己的一样使用。你不想要这个,让它异步并等待。一旦修复,
__aexit__
将打印 "aexit triggered" 并且 然后 调用join
,等待任务去完成。因此,来自尚未完成的任务的消息将出现在 "aexit triggered" 消息之后。__aexit__
的 return 值被忽略,除非退出是因为引发了异常。在这种情况下,return True
将吞下异常。删除return
所以那部分,固定的:
async def __aexit__(self, exc_type, exc, tb): print("aexit triggered") await self.join() print("aexit completed")
您的
TaskPool
必须使任务结果可用。这是你的设计,python 不会在引擎盖下做任何魔术。根据您所拥有的,一个简单的方法是join
将gather
的结果存储为任务池的一个属性。