FastAPI,添加长任务来缓冲并逐一处理,同时保持服务器响应能力

FastAPI, add long tasks to buffer and process them one by one, while maintaining server responsiveness

我正在尝试设置一个 FastAPI 服务器,它将一些生物数据作为输入,并 运行 对它们进行一些处理。由于处理会占用服务器的所有资源,因此应按顺序处理查询。但是,服务器应保持响应并在缓冲区中添加更多请求。我一直在尝试为此使用 BackgroundTasks 模块,但在发送第二个查询后,响应在任务 运行ning 期间延迟。感谢任何帮助,并提前致谢。

import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks

EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}

app = FastAPI()

@dataclass
class Query():
    query_name: str
    query_sequence: str
    experiment_id: str = None
    status: str = "pending"

    def __post_init__(self):
        self.experiment_id = str(time.time())
        self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
        os.makedirs(self.experiment_dir, exist_ok=False)

    def run(self):
        self.status = "running"
        # perform some long task using the query sequence and get a return code #
        self.status = "finished"
        return 0 # or another code depending on the final output

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    query_data = await request.body()
    query_data = query_data.decode("utf-8")
    query_data = dict(str(x).split("=") for x in query_data.split("&"))
    query = Query(**query_data)
    QUERY_BUFFER[query.experiment_id] = query
    background_tasks.add_task(process, query)
    return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}

async def process(query):
    """ Process query and generate data"""
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')

@app.get("/backlog/")
def return_backlog():
    return {f"Currently {len(QUERY_BUFFER)} jobs in the backlog."}

我认为你的问题出在你想要 运行 的任务上,而不是 BackgroundTask 本身。

FastAPI(以及负责 运行 后台任务的底层 Starlette)在 asyncio 之上创建并异步处理所有请求。这意味着,如果一个请求正在处理,如果在处理当前请求时有任何 IO 操作,并且该 IO 操作支持异步方式,FastAPI 将在该 IO 操作挂起时切换到队列中的下一个请求。

添加到队列中的任何后台任务也是如此。如果后台任务挂起,则只有在 FastAPI 等待任何 IO 操作时,才会处理任何请求或其他后台任务。

如您所见,当您的视图或任务没有任何 IO 操作或它们不能 运行 异步时,这并不理想。这种情况有一个解决方法:

  • 将您的视图或任务声明为正常的非异步函数
    然后 Starlette 将 运行 在主异步循环之外的单独线程中处理这些视图,因此可以同时处理其他请求
  • 手动 运行 您的逻辑中可能会阻止 使用 asgiref.sync_to_async
    处理其他请求 这也会导致此逻辑在单独的线程中执行,释放主异步循环以处理其他请求,直到函数 returns.

如果您在长期运行ning 任务中不执行任何异步IO 操作,第一种方法将最适合您。否则,您应该将代码的任何部分是 long-运行ning 或执行任何非异步 IO 操作,并将其用 sync_to_async.

包装起来

编辑:

最初的答案受到 httpx.AsyncClient 测试的影响(在最初的警告中可能是这种情况)。测试客户端导致后台任务阻塞,而这些任务在没有测试客户端的情况下不会阻塞。因此,如果您不想使用 httpx.AsyncClient 对其进行测试,则有一个更简单的解决方案。新的解决方案使用 uvicorn,然后我用 Postman 手动测试了它。

此解决方案使用函数作为后台任务 (process),因此它 运行 在主线程之外。然后它将作业安排到 运行 aprocess ,当事件循环有机会时,它将 运行 在主线程中。 aprocess 协同程序可以像以前一样等待查询的 run 协同程序。

此外,我在 process 函数中添加了一个 time.sleep(10) 来说明即使是长时间的 运行ning 非 IO 任务也不会阻止您的原始 HTTP 会话发送一个返回给客户端的响应(尽管这仅在它是释放 GIL 的东西时才有效。如果它是 CPU 绑定的,尽管您可能希望通过使用多处理或单独的服务来完全分开一个进程)。最后,我用日志替换了打印,以便它们与 uvicorn 日志一起工作。

import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
import logging


logging.basicConfig(level=logging.INFO, format="%(levelname)-9s %(asctime)s - %(name)s - %(message)s")
LOGGER = logging.getLogger(__name__)

EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}

app = FastAPI()
loop = asyncio.get_event_loop()

@dataclass
class Query():
    query_name: str
    query_sequence: str
    experiment_id: str = None
    status: str = "pending"

    def __post_init__(self):
        self.experiment_id = str(time.time())
        self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
        # os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing

    async def run(self):
        self.status = "running"
        await asyncio.sleep(5)  # simulate long running query
        # perform some long task using the query sequence and get a return code #
        self.status = "finished"
        return 0 # or another code depending on the final output

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    query_data = await request.body()
    query_data = query_data.decode("utf-8")
    query_data = dict(str(x).split("=") for x in query_data.split("&"))
    query = Query(**query_data)
    QUERY_BUFFER[query.experiment_id] = query
    background_tasks.add_task(process, query)
    LOGGER.info(f'root - added task')
    return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}


def process(query):
    """ Schedule processing of query, and then run some long running non-IO job without blocking the app"""
    asyncio.run_coroutine_threadsafe(aprocess(query), loop)
    LOGGER.info(f"process - {query.experiment_id} - Submitted query job. Now run non-IO work for 10 seconds...")
    time.sleep(10) # simulate long running non-IO work, does not block app as this is in another thread - provided it is not cpu bound.
    LOGGER.info(f'process - {query.experiment_id} - wake up!')


async def aprocess(query):
    """ Process query and generate data """
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    LOGGER.info(f'aprocess - Query {query.experiment_id} processing finished with return code {ret_code}.')


@app.get("/backlog/")
def return_backlog():
    return {f"return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run("scratch_26:app", host="127.0.0.1", port=8000)

原始答案:

*关于这个答案的警告 - 我已经尝试使用“httpx.AsyncClient”对此进行测试,与部署在 guvicorn 后面相比,这可能会导致不同的行为。*

据我所知(对此我非常愿意接受更正),BackgroundTasks 实际上需要在 发送 HTTP 响应之前完成。这不是 Starlette 文档或 FastAPI 文档所说的,但似乎是这样,至少在使用 httpx AsyncClient 时是这样。

无论您添加一个协程(在主线程中执行)还是一个函数(在它自己的线程中执行),在后台任务完成之前阻止发送 HTTP 响应。

如果你想等待一个很长的 运行ning(异步友好)任务,你可以通过使用包装函数来解决这个问题。包装函数将真正的任务(协程,因为它将使用 await)添加到事件循环,然后添加到 returns。由于这非常快,它“阻塞”的事实不再重要(假设几毫秒无关紧要)。

然后真正的任务依次执行(但在发送初始 HTTP 响应之后),虽然它在主线程上,但函数的 asyncio 部分不会阻塞。

你可以试试这个:

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    ...
    background_tasks.add_task(process_wrapper, query)
    ...

async def process_wrapper(query):
    loop = asyncio.get_event_loop()
    loop.create_task(process(query))

async def process(query):
    """ Process query and generate data"""
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')

另请注意,您还需要通过添加 async 关键字使 run() 函数成为协程,因为您希望从 process() 函数中等待它。

这是一个完整的工作示例,使用 httpx.AsyncClient 对其进行测试。为了便于说明,我添加了 fmt_duration 辅助函数来显示流逝的时间。我还注释掉了创建目录的代码,并在 run() 函数中模拟了 2 秒的查询持续时间。

import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
from httpx import AsyncClient

EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}

app = FastAPI()
start_ts = time.time()


@dataclass
class Query():
    query_name: str
    query_sequence: str
    experiment_id: str = None
    status: str = "pending"

    def __post_init__(self):
        self.experiment_id = str(time.time())
        self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
        # os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing

    async def run(self):
        self.status = "running"
        await asyncio.sleep(2)  # simulate long running query
        # perform some long task using the query sequence and get a return code #
        self.status = "finished"
        return 0 # or another code depending on the final output

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    query_data = await request.body()
    query_data = query_data.decode("utf-8")
    query_data = dict(str(x).split("=") for x in query_data.split("&"))
    query = Query(**query_data)
    QUERY_BUFFER[query.experiment_id] = query
    background_tasks.add_task(process_wrapper, query)
    print(f'{fmt_duration()} - root - added task')
    return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}


async def process_wrapper(query):
    loop = asyncio.get_event_loop()
    loop.create_task(process(query))

async def process(query):
    """ Process query and generate data"""
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    print(f'{fmt_duration()} - process - Query {query.experiment_id} processing finished with return code {ret_code}.')

@app.get("/backlog/")
def return_backlog():
    return {f"{fmt_duration()} - return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}


async def test_me():
    async with AsyncClient(app=app, base_url="http://example") as ac:
        res = await ac.post("/", content="query_name=foo&query_sequence=42")
        print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
        res = await ac.post("/", content="query_name=bar&query_sequence=43")
        print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
        content = ""
        while not content.endswith('0 jobs in the backlog."]'):
            await asyncio.sleep(1)
            backlog_results = await ac.get("/backlog")
            content = backlog_results.content.decode("utf8")
            print(f"{fmt_duration()} - test_me - content: {content}")


def fmt_duration():
    return f"Progress time: {time.time() - start_ts:.3f}s"

loop = asyncio.get_event_loop()
print(f'starting loop...')
loop.run_until_complete(test_me())
duration = time.time() - start_ts
print(f'Finished. Duration: {duration:.3f} seconds.')

在我的本地环境中,如果我 运行 以上我得到这个输出:

starting loop...
Progress time: 0.005s - root - added task
Progress time: 0.006s - [200] - {"Query created":{"query_name":"foo","query_sequence":"42","experiment_id":"1627489235.9300923","status":"pending","experiment_dir":"/experiments/1627489235.9300923"},"Query ID":"1627489235.9300923","Backlog Length":1}
Progress time: 0.007s - root - added task
Progress time: 0.009s - [200] - {"Query created":{"query_name":"bar","query_sequence":"43","experiment_id":"1627489235.932097","status":"pending","experiment_dir":"/experiments/1627489235.932097"},"Query ID":"1627489235.932097","Backlog Length":2}
Progress time: 1.016s - test_me - content: ["Progress time: 1.015s - return_backlog - Currently 2 jobs in the backlog."]
Progress time: 2.008s - process - Query 1627489235.9300923 processing finished with return code 0.
Progress time: 2.008s - process - Query 1627489235.932097 processing finished with return code 0.
Progress time: 2.041s - test_me - content: ["Progress time: 2.041s - return_backlog - Currently 0 jobs in the backlog."]
Finished. Duration: 2.041 seconds.

我还尝试使 process_wrapper 成为一个函数,以便 Starlette 在新线程中执行它。这以相同的方式工作,只需使用 run_coroutine_threadsafe 而不是 create_task

def process_wrapper(query):
    loop = asyncio.get_event_loop()
    asyncio.run_coroutine_threadsafe(process(query), loop)

如果有一些其他方法可以在不阻止 HTTP 响应的情况下将后台任务发送到 运行,我很想知道如何做,但缺少这个包装器解决方案应该起作用。