使 FastAPI WebSockets 的 CPU 绑定任务异步

Make an CPU-bound task asynchronous for FastAPI WebSockets

所以我有一个 CPU-bound long-运行 算法,我们称它为任务。 假设它看起来像这样:

def task(parameters):
  result = 0
  for _ in range(10):
    for _ in range(10):
      for _ in range(10):
        result += do_things()
  return result

@app.get('/')
def results(parameters: BodyModel):
    return task(parameters)

如果我将其封装在 def 路径操作函数中 一切正常,因为它是在不同的线程中启动的。我可以访问多个路径等。并发通过将我的 CPU 绑定任务推送到一个单独的线程来完成它的工作。但我现在想切换到 WebSockets,以传达中间结果。为此,我必须将我的整个事情标记为异步并将 WebSocket 传递到我的任务中。所以它看起来像这样:

async def task(parameters):
  result = 0
  for _ in range(10):
    for _ in range(10):
      for _ in range(10):
        intermediate_result = do_things()
        await parameters.websocket.send_text(intermediate_result)
        result += intermediate_result
  return result

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        parameters = await websocket.receive_text()
        parameters.websocket = websocket
        result = await task(parameters)
        await websocket.send_text(result)

发送中间结果非常有效。但是现在我的算法阻止了 FastAPI,因为它本身并不是真正的异步。一旦我 post 发送到 '/ws' FastAPI 的消息被阻止并且在我的任务完成之前不会响应任何其他请求。

所以我需要一些建议

我尝试按照 所述使用 ProcessPoolExecuter,但不可能 pickle 协程,据我所知,我必须使我的任务成为协程(使用异步)才能使用 websocket.send_text() 在里面。

此外,我考虑过将中间结果存储在某个地方,创建一个 HTTP POST 来开始我的任务,然后使用另一个 WebSocket 连接来读取和发送中间结果。但是我也可以类似地启动一个后台任务并实现一个常规的 HTTP 轮询机制。但我也不想,主要是因为我打算使用 Google Cloud 运行 来限制所有连接关闭时的 CPU 。而且我认为教我的任务如何直接通过 WebSocket 进行通信是更好的做法。

我希望我的问题很清楚。这是我第一个使用 FastAPI 和异步的大型项目,之前没有真正使用过 AsyncIO。所以我可能只是错过了一些东西。感谢您的建议。

如果有人遇到这个问题,我现在将添加适合我的解决方案。

我在关注这个:https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

关键是让它成为非阻塞的。因此,例如,而不是:

 # 1. Run in the default loop's executor:
result = await loop.run_in_executor(None, blocking_io)
print('default thread pool', result)

我移动等待并将代码更改为:

# 1. Run in the default loop's executor:
thread = loop.run_in_executor(None, blocking_io)
print('default thread pool', result)
while True:
    asyncio.sleep(1)
    websocket.send_text('status updates...'
    if internal_logger.blocking_stuff_finished:
        break
result = await thread
websocket.send_text('result:', result)
websocket.close()

这样我就可以将 cpu_bound 的东西放在一个单独的线程中,我不需要等待,一切正常。

制作自定义线程池也可以,但我们需要删除上下文管理器以使其成为非阻塞的。

# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
    result = await loop.run_in_executor(pool, blocking_io)

然后会变成:

pool = concurrent.futures.ThreadPoolExecutor()
thread = loop.run_in_executor(pool, blocking_io)

理论上,这同样适用于 ProcessPoolExecutor,但需要做更多的工作,因为没有共享内存并且我的终止条件不会像上面描述的那样工作。

是的,我知道 cpu_bound 事情最好在不同的进程中完成,但将它移动到单独的线程对我来说不起作用,我确实喜欢共享内存 atm。