FastAPI 异步后台任务阻塞其他请求?

FastAPI asynchronous background tasks blocks other requests?

我想 运行 FastAPI 中的一个简单后台任务,它涉及在将其转储到数据库之前进行一些计算。但是,计算会阻止它接收更多请求。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()
db = Database()

async def task(data):
    otherdata = await db.fetch("some sql")
    newdata = somelongcomputation(data,otherdata) # this blocks other requests
    await db.execute("some sql",newdata)
   


@app.post("/profile")
async def profile(data: Data, background_tasks: BackgroundTasks):
    background_tasks.add_task(task, data)
    return {}

解决此问题的最佳方法是什么?

你的 task 被定义为 async,这意味着 fastapi(或者更确切地说是 starlette)将在 asyncio 事件循环中 运行 它。 并且因为 somelongcomputation 是同步的(即不等待某些 IO,而是进行计算)只要它是 运行ning 就会阻塞事件循环。

我看到了一些解决这个问题的方法:

  • 使用更多工人(例如 uvicorn main:app --workers 4)。这将允许最多 4 个 somelongcomputation 并行。

  • 重写您的任务,使其不是 async(即将其定义为 def task(data): ... 等)。然后 starlette 将 运行 它放在一个单独的线程中。

  • 使用fastapi.concurrency.run_in_threadpool,这也会运行它在一个单独的线程中。像这样:

    from fastapi.concurrency import run_in_threadpool
    async def task(data):
        otherdata = await db.fetch("some sql")
        newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata))
        await db.execute("some sql", newdata)
    
    • 或直接使用 asyncios's run_in_executorrun_in_threadpool 在后台使用):
      import asyncio
      async def task(data):
          otherdata = await db.fetch("some sql")
          loop = asyncio.get_running_loop()
          newdata = await loop.run_in_executor(None, lambda: somelongcomputation(data, otherdata))
          await db.execute("some sql", newdata)
      
      您甚至可以在单独的过程中将 concurrent.futures.ProcessPoolExecutor 作为 run_in_threadpool 的第一个参数传递给 运行。
  • 自己生成一个单独的线程/进程。例如。使用 concurrent.futures.

  • 用芹菜之类的比较重的东西。 (在 fastapi 文档中也提到 here)。

读这个issue

同样在下面的示例中,my_model.function_b 可以是任何阻塞函数或进程。

TL;DR

from starlette.concurrency import run_in_threadpool

@app.get("/long_answer")
async def long_answer():
    rst = await run_in_threadpool(my_model.function_b, arg_1, arg_2)
    return rst