使 FastAPI WebSockets 的 CPU 绑定任务异步
Make an CPU-bound task asynchronous for FastAPI WebSockets
所以我有一个 CPU-bound long-运行 算法,我们称它为任务。
假设它看起来像这样:
def task(parameters):
result = 0
for _ in range(10):
for _ in range(10):
for _ in range(10):
result += do_things()
return result
@app.get('/')
def results(parameters: BodyModel):
return task(parameters)
如果我将其封装在 def
路径操作函数中 一切正常,因为它是在不同的线程中启动的。我可以访问多个路径等。并发通过将我的 CPU 绑定任务推送到一个单独的线程来完成它的工作。但我现在想切换到 WebSockets,以传达中间结果。为此,我必须将我的整个事情标记为异步并将 WebSocket 传递到我的任务中。所以它看起来像这样:
async def task(parameters):
result = 0
for _ in range(10):
for _ in range(10):
for _ in range(10):
intermediate_result = do_things()
await parameters.websocket.send_text(intermediate_result)
result += intermediate_result
return result
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
parameters = await websocket.receive_text()
parameters.websocket = websocket
result = await task(parameters)
await websocket.send_text(result)
发送中间结果非常有效。但是现在我的算法阻止了 FastAPI,因为它本身并不是真正的异步。一旦我 post 发送到 '/ws' FastAPI 的消息被阻止并且在我的任务完成之前不会响应任何其他请求。
所以我需要一些建议
- a) 从同步 CPU 绑定任务中发送 WebSocket 消息(我没有找到同步 send_text 替代方案)所以我可以使用
def
或
- b) 如何使我的 CPU 绑定真正异步,以便在我使用
async def
. 时它不再阻塞任何东西
我尝试按照 所述使用 ProcessPoolExecuter,但不可能 pickle 协程,据我所知,我必须使我的任务成为协程(使用异步)才能使用 websocket.send_text()
在里面。
此外,我考虑过将中间结果存储在某个地方,创建一个 HTTP POST 来开始我的任务,然后使用另一个 WebSocket 连接来读取和发送中间结果。但是我也可以类似地启动一个后台任务并实现一个常规的 HTTP 轮询机制。但我也不想,主要是因为我打算使用 Google Cloud 运行 来限制所有连接关闭时的 CPU 。而且我认为教我的任务如何直接通过 WebSocket 进行通信是更好的做法。
我希望我的问题很清楚。这是我第一个使用 FastAPI 和异步的大型项目,之前没有真正使用过 AsyncIO。所以我可能只是错过了一些东西。感谢您的建议。
如果有人遇到这个问题,我现在将添加适合我的解决方案。
我在关注这个:https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
关键是让它成为非阻塞的。因此,例如,而不是:
# 1. Run in the default loop's executor:
result = await loop.run_in_executor(None, blocking_io)
print('default thread pool', result)
我移动等待并将代码更改为:
# 1. Run in the default loop's executor:
thread = loop.run_in_executor(None, blocking_io)
print('default thread pool', result)
while True:
asyncio.sleep(1)
websocket.send_text('status updates...'
if internal_logger.blocking_stuff_finished:
break
result = await thread
websocket.send_text('result:', result)
websocket.close()
这样我就可以将 cpu_bound 的东西放在一个单独的线程中,我不需要等待,一切正常。
制作自定义线程池也可以,但我们需要删除上下文管理器以使其成为非阻塞的。
# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
然后会变成:
pool = concurrent.futures.ThreadPoolExecutor()
thread = loop.run_in_executor(pool, blocking_io)
理论上,这同样适用于 ProcessPoolExecutor,但需要做更多的工作,因为没有共享内存并且我的终止条件不会像上面描述的那样工作。
是的,我知道 cpu_bound 事情最好在不同的进程中完成,但将它移动到单独的线程对我来说不起作用,我确实喜欢共享内存 atm。
所以我有一个 CPU-bound long-运行 算法,我们称它为任务。 假设它看起来像这样:
def task(parameters):
result = 0
for _ in range(10):
for _ in range(10):
for _ in range(10):
result += do_things()
return result
@app.get('/')
def results(parameters: BodyModel):
return task(parameters)
如果我将其封装在 def
路径操作函数中 一切正常,因为它是在不同的线程中启动的。我可以访问多个路径等。并发通过将我的 CPU 绑定任务推送到一个单独的线程来完成它的工作。但我现在想切换到 WebSockets,以传达中间结果。为此,我必须将我的整个事情标记为异步并将 WebSocket 传递到我的任务中。所以它看起来像这样:
async def task(parameters):
result = 0
for _ in range(10):
for _ in range(10):
for _ in range(10):
intermediate_result = do_things()
await parameters.websocket.send_text(intermediate_result)
result += intermediate_result
return result
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
parameters = await websocket.receive_text()
parameters.websocket = websocket
result = await task(parameters)
await websocket.send_text(result)
发送中间结果非常有效。但是现在我的算法阻止了 FastAPI,因为它本身并不是真正的异步。一旦我 post 发送到 '/ws' FastAPI 的消息被阻止并且在我的任务完成之前不会响应任何其他请求。
所以我需要一些建议
- a) 从同步 CPU 绑定任务中发送 WebSocket 消息(我没有找到同步 send_text 替代方案)所以我可以使用
def
或 - b) 如何使我的 CPU 绑定真正异步,以便在我使用
async def
. 时它不再阻塞任何东西
我尝试按照 websocket.send_text()
在里面。
此外,我考虑过将中间结果存储在某个地方,创建一个 HTTP POST 来开始我的任务,然后使用另一个 WebSocket 连接来读取和发送中间结果。但是我也可以类似地启动一个后台任务并实现一个常规的 HTTP 轮询机制。但我也不想,主要是因为我打算使用 Google Cloud 运行 来限制所有连接关闭时的 CPU 。而且我认为教我的任务如何直接通过 WebSocket 进行通信是更好的做法。
我希望我的问题很清楚。这是我第一个使用 FastAPI 和异步的大型项目,之前没有真正使用过 AsyncIO。所以我可能只是错过了一些东西。感谢您的建议。
如果有人遇到这个问题,我现在将添加适合我的解决方案。
我在关注这个:https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
关键是让它成为非阻塞的。因此,例如,而不是:
# 1. Run in the default loop's executor:
result = await loop.run_in_executor(None, blocking_io)
print('default thread pool', result)
我移动等待并将代码更改为:
# 1. Run in the default loop's executor:
thread = loop.run_in_executor(None, blocking_io)
print('default thread pool', result)
while True:
asyncio.sleep(1)
websocket.send_text('status updates...'
if internal_logger.blocking_stuff_finished:
break
result = await thread
websocket.send_text('result:', result)
websocket.close()
这样我就可以将 cpu_bound 的东西放在一个单独的线程中,我不需要等待,一切正常。
制作自定义线程池也可以,但我们需要删除上下文管理器以使其成为非阻塞的。
# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
然后会变成:
pool = concurrent.futures.ThreadPoolExecutor()
thread = loop.run_in_executor(pool, blocking_io)
理论上,这同样适用于 ProcessPoolExecutor,但需要做更多的工作,因为没有共享内存并且我的终止条件不会像上面描述的那样工作。
是的,我知道 cpu_bound 事情最好在不同的进程中完成,但将它移动到单独的线程对我来说不起作用,我确实喜欢共享内存 atm。