FastAPI,添加长任务来缓冲并逐一处理,同时保持服务器响应能力
FastAPI, add long tasks to buffer and process them one by one, while maintaining server responsiveness
我正在尝试设置一个 FastAPI 服务器,它将一些生物数据作为输入,并 运行 对它们进行一些处理。由于处理会占用服务器的所有资源,因此应按顺序处理查询。但是,服务器应保持响应并在缓冲区中添加更多请求。我一直在尝试为此使用 BackgroundTasks 模块,但在发送第二个查询后,响应在任务 运行ning 期间延迟。感谢任何帮助,并提前致谢。
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}
app = FastAPI()
@dataclass
class Query():
query_name: str
query_sequence: str
experiment_id: str = None
status: str = "pending"
def __post_init__(self):
self.experiment_id = str(time.time())
self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
os.makedirs(self.experiment_dir, exist_ok=False)
def run(self):
self.status = "running"
# perform some long task using the query sequence and get a return code #
self.status = "finished"
return 0 # or another code depending on the final output
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
query_data = await request.body()
query_data = query_data.decode("utf-8")
query_data = dict(str(x).split("=") for x in query_data.split("&"))
query = Query(**query_data)
QUERY_BUFFER[query.experiment_id] = query
background_tasks.add_task(process, query)
return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}
async def process(query):
""" Process query and generate data"""
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')
@app.get("/backlog/")
def return_backlog():
return {f"Currently {len(QUERY_BUFFER)} jobs in the backlog."}
我认为你的问题出在你想要 运行 的任务上,而不是 BackgroundTask
本身。
FastAPI(以及负责 运行 后台任务的底层 Starlette)在 asyncio 之上创建并异步处理所有请求。这意味着,如果一个请求正在处理,如果在处理当前请求时有任何 IO 操作,并且该 IO 操作支持异步方式,FastAPI 将在该 IO 操作挂起时切换到队列中的下一个请求。
添加到队列中的任何后台任务也是如此。如果后台任务挂起,则只有在 FastAPI 等待任何 IO 操作时,才会处理任何请求或其他后台任务。
如您所见,当您的视图或任务没有任何 IO 操作或它们不能 运行 异步时,这并不理想。这种情况有一个解决方法:
- 将您的视图或任务声明为正常的非异步函数
然后 Starlette 将 运行 在主异步循环之外的单独线程中处理这些视图,因此可以同时处理其他请求
- 手动 运行 您的逻辑中可能会阻止
使用
asgiref.sync_to_async
处理其他请求
这也会导致此逻辑在单独的线程中执行,释放主异步循环以处理其他请求,直到函数 returns.
如果您在长期运行ning 任务中不执行任何异步IO 操作,第一种方法将最适合您。否则,您应该将代码的任何部分是 long-运行ning 或执行任何非异步 IO 操作,并将其用 sync_to_async
.
包装起来
编辑:
最初的答案受到 httpx.AsyncClient
测试的影响(在最初的警告中可能是这种情况)。测试客户端导致后台任务阻塞,而这些任务在没有测试客户端的情况下不会阻塞。因此,如果您不想使用 httpx.AsyncClient
对其进行测试,则有一个更简单的解决方案。新的解决方案使用 uvicorn,然后我用 Postman 手动测试了它。
此解决方案使用函数作为后台任务 (process
),因此它 运行 在主线程之外。然后它将作业安排到 运行 aprocess
,当事件循环有机会时,它将 运行 在主线程中。 aprocess
协同程序可以像以前一样等待查询的 run
协同程序。
此外,我在 process
函数中添加了一个 time.sleep(10)
来说明即使是长时间的 运行ning 非 IO 任务也不会阻止您的原始 HTTP 会话发送一个返回给客户端的响应(尽管这仅在它是释放 GIL 的东西时才有效。如果它是 CPU 绑定的,尽管您可能希望通过使用多处理或单独的服务来完全分开一个进程)。最后,我用日志替换了打印,以便它们与 uvicorn 日志一起工作。
import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
import logging
logging.basicConfig(level=logging.INFO, format="%(levelname)-9s %(asctime)s - %(name)s - %(message)s")
LOGGER = logging.getLogger(__name__)
EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}
app = FastAPI()
loop = asyncio.get_event_loop()
@dataclass
class Query():
query_name: str
query_sequence: str
experiment_id: str = None
status: str = "pending"
def __post_init__(self):
self.experiment_id = str(time.time())
self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
# os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing
async def run(self):
self.status = "running"
await asyncio.sleep(5) # simulate long running query
# perform some long task using the query sequence and get a return code #
self.status = "finished"
return 0 # or another code depending on the final output
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
query_data = await request.body()
query_data = query_data.decode("utf-8")
query_data = dict(str(x).split("=") for x in query_data.split("&"))
query = Query(**query_data)
QUERY_BUFFER[query.experiment_id] = query
background_tasks.add_task(process, query)
LOGGER.info(f'root - added task')
return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}
def process(query):
""" Schedule processing of query, and then run some long running non-IO job without blocking the app"""
asyncio.run_coroutine_threadsafe(aprocess(query), loop)
LOGGER.info(f"process - {query.experiment_id} - Submitted query job. Now run non-IO work for 10 seconds...")
time.sleep(10) # simulate long running non-IO work, does not block app as this is in another thread - provided it is not cpu bound.
LOGGER.info(f'process - {query.experiment_id} - wake up!')
async def aprocess(query):
""" Process query and generate data """
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
LOGGER.info(f'aprocess - Query {query.experiment_id} processing finished with return code {ret_code}.')
@app.get("/backlog/")
def return_backlog():
return {f"return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}
if __name__ == "__main__":
import uvicorn
uvicorn.run("scratch_26:app", host="127.0.0.1", port=8000)
原始答案:
*关于这个答案的警告 - 我已经尝试使用“httpx.AsyncClient”对此进行测试,与部署在 guvicorn 后面相比,这可能会导致不同的行为。*
据我所知(对此我非常愿意接受更正),BackgroundTasks
实际上需要在 发送 HTTP 响应之前完成。这不是 Starlette 文档或 FastAPI 文档所说的,但似乎是这样,至少在使用 httpx AsyncClient 时是这样。
无论您添加一个协程(在主线程中执行)还是一个函数(在它自己的线程中执行),在后台任务完成之前阻止发送 HTTP 响应。
如果你想等待一个很长的 运行ning(异步友好)任务,你可以通过使用包装函数来解决这个问题。包装函数将真正的任务(协程,因为它将使用 await
)添加到事件循环,然后添加到 returns。由于这非常快,它“阻塞”的事实不再重要(假设几毫秒无关紧要)。
然后真正的任务依次执行(但在发送初始 HTTP 响应之后),虽然它在主线程上,但函数的 asyncio 部分不会阻塞。
你可以试试这个:
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
...
background_tasks.add_task(process_wrapper, query)
...
async def process_wrapper(query):
loop = asyncio.get_event_loop()
loop.create_task(process(query))
async def process(query):
""" Process query and generate data"""
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')
另请注意,您还需要通过添加 async
关键字使 run()
函数成为协程,因为您希望从 process()
函数中等待它。
这是一个完整的工作示例,使用 httpx.AsyncClient
对其进行测试。为了便于说明,我添加了 fmt_duration
辅助函数来显示流逝的时间。我还注释掉了创建目录的代码,并在 run()
函数中模拟了 2 秒的查询持续时间。
import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
from httpx import AsyncClient
EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}
app = FastAPI()
start_ts = time.time()
@dataclass
class Query():
query_name: str
query_sequence: str
experiment_id: str = None
status: str = "pending"
def __post_init__(self):
self.experiment_id = str(time.time())
self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
# os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing
async def run(self):
self.status = "running"
await asyncio.sleep(2) # simulate long running query
# perform some long task using the query sequence and get a return code #
self.status = "finished"
return 0 # or another code depending on the final output
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
query_data = await request.body()
query_data = query_data.decode("utf-8")
query_data = dict(str(x).split("=") for x in query_data.split("&"))
query = Query(**query_data)
QUERY_BUFFER[query.experiment_id] = query
background_tasks.add_task(process_wrapper, query)
print(f'{fmt_duration()} - root - added task')
return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}
async def process_wrapper(query):
loop = asyncio.get_event_loop()
loop.create_task(process(query))
async def process(query):
""" Process query and generate data"""
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
print(f'{fmt_duration()} - process - Query {query.experiment_id} processing finished with return code {ret_code}.')
@app.get("/backlog/")
def return_backlog():
return {f"{fmt_duration()} - return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}
async def test_me():
async with AsyncClient(app=app, base_url="http://example") as ac:
res = await ac.post("/", content="query_name=foo&query_sequence=42")
print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
res = await ac.post("/", content="query_name=bar&query_sequence=43")
print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
content = ""
while not content.endswith('0 jobs in the backlog."]'):
await asyncio.sleep(1)
backlog_results = await ac.get("/backlog")
content = backlog_results.content.decode("utf8")
print(f"{fmt_duration()} - test_me - content: {content}")
def fmt_duration():
return f"Progress time: {time.time() - start_ts:.3f}s"
loop = asyncio.get_event_loop()
print(f'starting loop...')
loop.run_until_complete(test_me())
duration = time.time() - start_ts
print(f'Finished. Duration: {duration:.3f} seconds.')
在我的本地环境中,如果我 运行 以上我得到这个输出:
starting loop...
Progress time: 0.005s - root - added task
Progress time: 0.006s - [200] - {"Query created":{"query_name":"foo","query_sequence":"42","experiment_id":"1627489235.9300923","status":"pending","experiment_dir":"/experiments/1627489235.9300923"},"Query ID":"1627489235.9300923","Backlog Length":1}
Progress time: 0.007s - root - added task
Progress time: 0.009s - [200] - {"Query created":{"query_name":"bar","query_sequence":"43","experiment_id":"1627489235.932097","status":"pending","experiment_dir":"/experiments/1627489235.932097"},"Query ID":"1627489235.932097","Backlog Length":2}
Progress time: 1.016s - test_me - content: ["Progress time: 1.015s - return_backlog - Currently 2 jobs in the backlog."]
Progress time: 2.008s - process - Query 1627489235.9300923 processing finished with return code 0.
Progress time: 2.008s - process - Query 1627489235.932097 processing finished with return code 0.
Progress time: 2.041s - test_me - content: ["Progress time: 2.041s - return_backlog - Currently 0 jobs in the backlog."]
Finished. Duration: 2.041 seconds.
我还尝试使 process_wrapper
成为一个函数,以便 Starlette 在新线程中执行它。这以相同的方式工作,只需使用 run_coroutine_threadsafe
而不是 create_task
即
def process_wrapper(query):
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(process(query), loop)
如果有一些其他方法可以在不阻止 HTTP 响应的情况下将后台任务发送到 运行,我很想知道如何做,但缺少这个包装器解决方案应该起作用。
我正在尝试设置一个 FastAPI 服务器,它将一些生物数据作为输入,并 运行 对它们进行一些处理。由于处理会占用服务器的所有资源,因此应按顺序处理查询。但是,服务器应保持响应并在缓冲区中添加更多请求。我一直在尝试为此使用 BackgroundTasks 模块,但在发送第二个查询后,响应在任务 运行ning 期间延迟。感谢任何帮助,并提前致谢。
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}
app = FastAPI()
@dataclass
class Query():
query_name: str
query_sequence: str
experiment_id: str = None
status: str = "pending"
def __post_init__(self):
self.experiment_id = str(time.time())
self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
os.makedirs(self.experiment_dir, exist_ok=False)
def run(self):
self.status = "running"
# perform some long task using the query sequence and get a return code #
self.status = "finished"
return 0 # or another code depending on the final output
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
query_data = await request.body()
query_data = query_data.decode("utf-8")
query_data = dict(str(x).split("=") for x in query_data.split("&"))
query = Query(**query_data)
QUERY_BUFFER[query.experiment_id] = query
background_tasks.add_task(process, query)
return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}
async def process(query):
""" Process query and generate data"""
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')
@app.get("/backlog/")
def return_backlog():
return {f"Currently {len(QUERY_BUFFER)} jobs in the backlog."}
我认为你的问题出在你想要 运行 的任务上,而不是 BackgroundTask
本身。
FastAPI(以及负责 运行 后台任务的底层 Starlette)在 asyncio 之上创建并异步处理所有请求。这意味着,如果一个请求正在处理,如果在处理当前请求时有任何 IO 操作,并且该 IO 操作支持异步方式,FastAPI 将在该 IO 操作挂起时切换到队列中的下一个请求。
添加到队列中的任何后台任务也是如此。如果后台任务挂起,则只有在 FastAPI 等待任何 IO 操作时,才会处理任何请求或其他后台任务。
如您所见,当您的视图或任务没有任何 IO 操作或它们不能 运行 异步时,这并不理想。这种情况有一个解决方法:
- 将您的视图或任务声明为正常的非异步函数
然后 Starlette 将 运行 在主异步循环之外的单独线程中处理这些视图,因此可以同时处理其他请求 - 手动 运行 您的逻辑中可能会阻止
使用
asgiref.sync_to_async
处理其他请求 这也会导致此逻辑在单独的线程中执行,释放主异步循环以处理其他请求,直到函数 returns.
如果您在长期运行ning 任务中不执行任何异步IO 操作,第一种方法将最适合您。否则,您应该将代码的任何部分是 long-运行ning 或执行任何非异步 IO 操作,并将其用 sync_to_async
.
编辑:
最初的答案受到 httpx.AsyncClient
测试的影响(在最初的警告中可能是这种情况)。测试客户端导致后台任务阻塞,而这些任务在没有测试客户端的情况下不会阻塞。因此,如果您不想使用 httpx.AsyncClient
对其进行测试,则有一个更简单的解决方案。新的解决方案使用 uvicorn,然后我用 Postman 手动测试了它。
此解决方案使用函数作为后台任务 (process
),因此它 运行 在主线程之外。然后它将作业安排到 运行 aprocess
,当事件循环有机会时,它将 运行 在主线程中。 aprocess
协同程序可以像以前一样等待查询的 run
协同程序。
此外,我在 process
函数中添加了一个 time.sleep(10)
来说明即使是长时间的 运行ning 非 IO 任务也不会阻止您的原始 HTTP 会话发送一个返回给客户端的响应(尽管这仅在它是释放 GIL 的东西时才有效。如果它是 CPU 绑定的,尽管您可能希望通过使用多处理或单独的服务来完全分开一个进程)。最后,我用日志替换了打印,以便它们与 uvicorn 日志一起工作。
import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
import logging
logging.basicConfig(level=logging.INFO, format="%(levelname)-9s %(asctime)s - %(name)s - %(message)s")
LOGGER = logging.getLogger(__name__)
EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}
app = FastAPI()
loop = asyncio.get_event_loop()
@dataclass
class Query():
query_name: str
query_sequence: str
experiment_id: str = None
status: str = "pending"
def __post_init__(self):
self.experiment_id = str(time.time())
self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
# os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing
async def run(self):
self.status = "running"
await asyncio.sleep(5) # simulate long running query
# perform some long task using the query sequence and get a return code #
self.status = "finished"
return 0 # or another code depending on the final output
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
query_data = await request.body()
query_data = query_data.decode("utf-8")
query_data = dict(str(x).split("=") for x in query_data.split("&"))
query = Query(**query_data)
QUERY_BUFFER[query.experiment_id] = query
background_tasks.add_task(process, query)
LOGGER.info(f'root - added task')
return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}
def process(query):
""" Schedule processing of query, and then run some long running non-IO job without blocking the app"""
asyncio.run_coroutine_threadsafe(aprocess(query), loop)
LOGGER.info(f"process - {query.experiment_id} - Submitted query job. Now run non-IO work for 10 seconds...")
time.sleep(10) # simulate long running non-IO work, does not block app as this is in another thread - provided it is not cpu bound.
LOGGER.info(f'process - {query.experiment_id} - wake up!')
async def aprocess(query):
""" Process query and generate data """
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
LOGGER.info(f'aprocess - Query {query.experiment_id} processing finished with return code {ret_code}.')
@app.get("/backlog/")
def return_backlog():
return {f"return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}
if __name__ == "__main__":
import uvicorn
uvicorn.run("scratch_26:app", host="127.0.0.1", port=8000)
原始答案:
*关于这个答案的警告 - 我已经尝试使用“httpx.AsyncClient”对此进行测试,与部署在 guvicorn 后面相比,这可能会导致不同的行为。*据我所知(对此我非常愿意接受更正),BackgroundTasks
实际上需要在 发送 HTTP 响应之前完成。这不是 Starlette 文档或 FastAPI 文档所说的,但似乎是这样,至少在使用 httpx AsyncClient 时是这样。
无论您添加一个协程(在主线程中执行)还是一个函数(在它自己的线程中执行),在后台任务完成之前阻止发送 HTTP 响应。
如果你想等待一个很长的 运行ning(异步友好)任务,你可以通过使用包装函数来解决这个问题。包装函数将真正的任务(协程,因为它将使用 await
)添加到事件循环,然后添加到 returns。由于这非常快,它“阻塞”的事实不再重要(假设几毫秒无关紧要)。
然后真正的任务依次执行(但在发送初始 HTTP 响应之后),虽然它在主线程上,但函数的 asyncio 部分不会阻塞。
你可以试试这个:
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
...
background_tasks.add_task(process_wrapper, query)
...
async def process_wrapper(query):
loop = asyncio.get_event_loop()
loop.create_task(process(query))
async def process(query):
""" Process query and generate data"""
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')
另请注意,您还需要通过添加 async
关键字使 run()
函数成为协程,因为您希望从 process()
函数中等待它。
这是一个完整的工作示例,使用 httpx.AsyncClient
对其进行测试。为了便于说明,我添加了 fmt_duration
辅助函数来显示流逝的时间。我还注释掉了创建目录的代码,并在 run()
函数中模拟了 2 秒的查询持续时间。
import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
from httpx import AsyncClient
EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}
app = FastAPI()
start_ts = time.time()
@dataclass
class Query():
query_name: str
query_sequence: str
experiment_id: str = None
status: str = "pending"
def __post_init__(self):
self.experiment_id = str(time.time())
self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
# os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing
async def run(self):
self.status = "running"
await asyncio.sleep(2) # simulate long running query
# perform some long task using the query sequence and get a return code #
self.status = "finished"
return 0 # or another code depending on the final output
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
query_data = await request.body()
query_data = query_data.decode("utf-8")
query_data = dict(str(x).split("=") for x in query_data.split("&"))
query = Query(**query_data)
QUERY_BUFFER[query.experiment_id] = query
background_tasks.add_task(process_wrapper, query)
print(f'{fmt_duration()} - root - added task')
return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}
async def process_wrapper(query):
loop = asyncio.get_event_loop()
loop.create_task(process(query))
async def process(query):
""" Process query and generate data"""
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
print(f'{fmt_duration()} - process - Query {query.experiment_id} processing finished with return code {ret_code}.')
@app.get("/backlog/")
def return_backlog():
return {f"{fmt_duration()} - return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}
async def test_me():
async with AsyncClient(app=app, base_url="http://example") as ac:
res = await ac.post("/", content="query_name=foo&query_sequence=42")
print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
res = await ac.post("/", content="query_name=bar&query_sequence=43")
print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
content = ""
while not content.endswith('0 jobs in the backlog."]'):
await asyncio.sleep(1)
backlog_results = await ac.get("/backlog")
content = backlog_results.content.decode("utf8")
print(f"{fmt_duration()} - test_me - content: {content}")
def fmt_duration():
return f"Progress time: {time.time() - start_ts:.3f}s"
loop = asyncio.get_event_loop()
print(f'starting loop...')
loop.run_until_complete(test_me())
duration = time.time() - start_ts
print(f'Finished. Duration: {duration:.3f} seconds.')
在我的本地环境中,如果我 运行 以上我得到这个输出:
starting loop...
Progress time: 0.005s - root - added task
Progress time: 0.006s - [200] - {"Query created":{"query_name":"foo","query_sequence":"42","experiment_id":"1627489235.9300923","status":"pending","experiment_dir":"/experiments/1627489235.9300923"},"Query ID":"1627489235.9300923","Backlog Length":1}
Progress time: 0.007s - root - added task
Progress time: 0.009s - [200] - {"Query created":{"query_name":"bar","query_sequence":"43","experiment_id":"1627489235.932097","status":"pending","experiment_dir":"/experiments/1627489235.932097"},"Query ID":"1627489235.932097","Backlog Length":2}
Progress time: 1.016s - test_me - content: ["Progress time: 1.015s - return_backlog - Currently 2 jobs in the backlog."]
Progress time: 2.008s - process - Query 1627489235.9300923 processing finished with return code 0.
Progress time: 2.008s - process - Query 1627489235.932097 processing finished with return code 0.
Progress time: 2.041s - test_me - content: ["Progress time: 2.041s - return_backlog - Currently 0 jobs in the backlog."]
Finished. Duration: 2.041 seconds.
我还尝试使 process_wrapper
成为一个函数,以便 Starlette 在新线程中执行它。这以相同的方式工作,只需使用 run_coroutine_threadsafe
而不是 create_task
即
def process_wrapper(query):
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(process(query), loop)
如果有一些其他方法可以在不阻止 HTTP 响应的情况下将后台任务发送到 运行,我很想知道如何做,但缺少这个包装器解决方案应该起作用。