python asyncio.gather 与 asyncio.as_completed 当 IO 任务后跟 CPU 绑定任务时

python asyncio.gather vs asyncio.as_completed when IO task followed by CPU-bound task

我有一个程序工作流程如下:1. IO-bound(网页获取)-> 2. cpu-bound(处理信息)-> 3. IO-bound(将结果写入数据库).

我目前正在使用 aiohttp 获取网页。我目前正在使用 asyncio.as_completed 来收集第 1 步的任务,并在完成后将它们传递给第 2 步。我担心的是,这可能会消耗 cpu 资源并阻止步骤 2 中的程序流,从而干扰步骤 1 任务的完成。

我尝试使用 ProcessPoolExecutor 将第 2 步任务分包给其他进程,但第 2 步任务使用不可 pickleable 的数据结构和函数。我已经尝试过 ThreadPoolExecutor,虽然它有效(例如它没有崩溃),但我的理解是对 CPU 绑定的任务这样做会适得其反。

因为工作流有一个中间 cpu 绑定任务,使用 asyncio.gather(而不是 asyncio.as_completed)来完成之前的所有步骤 1 流程是否更有效继续第 2 步?

示例 asyncio.as_completed 代码:

async with ClientSession() as session:
    tasks = {self.fetch(session, url) for url in self.urls}
    for task in asyncio.as_completed(tasks):
        raw_data = await asyncio.shield(task)
        data = self.extract_data(*raw_data)
        await self.store_data(data)

示例 asyncio.gather 代码:

async with ClientSession() as session:
    tasks = {self.fetch(session, url) for url in self.urls}
    results = await asyncio.gather(*tasks)
for result in results:
    data = self.extract_data(*result)
    await self.store_data(data)

对有限样本的初步测试表明 as_completed 比 gather 更有效:~2.98s (as_completed) vs ~3.15s (gather)。但是是否存在一个 asyncio 概念问题,会偏向于一种解决方案而不是另一种解决方案?

“我已经尝试过 ThreadPoolExecutor,[...] 据我了解,对 CPU 绑定的任务这样做会适得其反。” - 从某种意义上说,这是适得其反的,你不会有两个这样的问题 运行ning Python 代码并行,使用多个 CPU 核心 - 但除此之外, 它会工作释放您的 asyncio 循环以继续工作,如果一次只为一项任务咀嚼代码。

如果您不能将事情 pickle 到子进程,运行在 ThreadPoolExecutor 中绑定 CPU 任务就足够了。

否则,只需在 cpu 代码中添加一些 await asyncio.sleep(0) (在循环内)和 运行 它们通常作为协程:这足以满足 cpu 限制不锁定异步循环的任务。