FastAPI 异步后台任务阻塞其他请求?
FastAPI asynchronous background tasks blocks other requests?
我想 运行 FastAPI 中的一个简单后台任务,它涉及在将其转储到数据库之前进行一些计算。但是,计算会阻止它接收更多请求。
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
db = Database()
async def task(data):
otherdata = await db.fetch("some sql")
newdata = somelongcomputation(data,otherdata) # this blocks other requests
await db.execute("some sql",newdata)
@app.post("/profile")
async def profile(data: Data, background_tasks: BackgroundTasks):
background_tasks.add_task(task, data)
return {}
解决此问题的最佳方法是什么?
你的 task
被定义为 async
,这意味着 fastapi(或者更确切地说是 starlette)将在 asyncio 事件循环中 运行 它。
并且因为 somelongcomputation
是同步的(即不等待某些 IO,而是进行计算)只要它是 运行ning 就会阻塞事件循环。
我看到了一些解决这个问题的方法:
使用更多工人(例如 uvicorn main:app --workers 4
)。这将允许最多 4 个 somelongcomputation
并行。
重写您的任务,使其不是 async
(即将其定义为 def task(data): ...
等)。然后 starlette 将 运行 它放在一个单独的线程中。
使用fastapi.concurrency.run_in_threadpool
,这也会运行它在一个单独的线程中。像这样:
from fastapi.concurrency import run_in_threadpool
async def task(data):
otherdata = await db.fetch("some sql")
newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata))
await db.execute("some sql", newdata)
- 或直接使用
asyncios
's run_in_executor
(run_in_threadpool
在后台使用):
import asyncio
async def task(data):
otherdata = await db.fetch("some sql")
loop = asyncio.get_running_loop()
newdata = await loop.run_in_executor(None, lambda: somelongcomputation(data, otherdata))
await db.execute("some sql", newdata)
您甚至可以在单独的过程中将 concurrent.futures.ProcessPoolExecutor
作为 run_in_threadpool
的第一个参数传递给 运行。
自己生成一个单独的线程/进程。例如。使用 concurrent.futures
.
用芹菜之类的比较重的东西。 (在 fastapi 文档中也提到 here)。
读这个issue。
同样在下面的示例中,my_model.function_b
可以是任何阻塞函数或进程。
TL;DR
from starlette.concurrency import run_in_threadpool
@app.get("/long_answer")
async def long_answer():
rst = await run_in_threadpool(my_model.function_b, arg_1, arg_2)
return rst
我想 运行 FastAPI 中的一个简单后台任务,它涉及在将其转储到数据库之前进行一些计算。但是,计算会阻止它接收更多请求。
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
db = Database()
async def task(data):
otherdata = await db.fetch("some sql")
newdata = somelongcomputation(data,otherdata) # this blocks other requests
await db.execute("some sql",newdata)
@app.post("/profile")
async def profile(data: Data, background_tasks: BackgroundTasks):
background_tasks.add_task(task, data)
return {}
解决此问题的最佳方法是什么?
你的 task
被定义为 async
,这意味着 fastapi(或者更确切地说是 starlette)将在 asyncio 事件循环中 运行 它。
并且因为 somelongcomputation
是同步的(即不等待某些 IO,而是进行计算)只要它是 运行ning 就会阻塞事件循环。
我看到了一些解决这个问题的方法:
使用更多工人(例如
uvicorn main:app --workers 4
)。这将允许最多 4 个somelongcomputation
并行。重写您的任务,使其不是
async
(即将其定义为def task(data): ...
等)。然后 starlette 将 运行 它放在一个单独的线程中。使用
fastapi.concurrency.run_in_threadpool
,这也会运行它在一个单独的线程中。像这样:from fastapi.concurrency import run_in_threadpool async def task(data): otherdata = await db.fetch("some sql") newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata)) await db.execute("some sql", newdata)
- 或直接使用
asyncios
'srun_in_executor
(run_in_threadpool
在后台使用):
您甚至可以在单独的过程中将import asyncio async def task(data): otherdata = await db.fetch("some sql") loop = asyncio.get_running_loop() newdata = await loop.run_in_executor(None, lambda: somelongcomputation(data, otherdata)) await db.execute("some sql", newdata)
concurrent.futures.ProcessPoolExecutor
作为run_in_threadpool
的第一个参数传递给 运行。
- 或直接使用
自己生成一个单独的线程/进程。例如。使用
concurrent.futures
.用芹菜之类的比较重的东西。 (在 fastapi 文档中也提到 here)。
读这个issue。
同样在下面的示例中,my_model.function_b
可以是任何阻塞函数或进程。
TL;DR
from starlette.concurrency import run_in_threadpool
@app.get("/long_answer")
async def long_answer():
rst = await run_in_threadpool(my_model.function_b, arg_1, arg_2)
return rst