在 Python FastAPI 应用程序中启动异步后台守护进程
Start an async background daemon in a Python FastAPI app
我正在使用 FastAPI 为分析系统构建异步后端。问题是它必须:a) 监听 API 呼叫并随时可用; b) 定期执行数据收集任务(解析数据并将其保存到数据库中)。
我写了这个函数作为守护进程:
async def start_metering_daemon(self) -> None:
"""sets a never ending task for metering"""
while True:
delay: int = self._get_delay() # delay in seconds until next execution
await asyncio.sleep(delay)
await self.gather_meterings() # perfom data gathering
我想要实现的是,当应用程序启动时,它还会将此守护程序函数添加到主事件循环中,并在有时间时执行它。但是,我一直无法找到适合任务规模的合适解决方案(添加 Celery 和类似的东西是一种矫枉过正)。
我尝试了以下方法来实现这一点,但 none 成功了:
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
await Gatherer().start_metering_daemon()
结果:由于线程被阻塞,服务器无法启动
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
fastapi.BackgroundTasks().add_task(Gatherer().start_metering_daemon)
结果:任务从未像日志中观察到的那样执行
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
fastapi.BackgroundTasks().add_task(asyncio.run, Gatherer().start_metering_daemon())
结果:与上一个相同
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
threading.Thread(target=asyncio.run, args=(Gatherer().start_metering_daemon(),)).start()
结果:这个有效,但是 a) 没有意义; b) 为 N 个 Uvicorn worker 生成 N 个相同的线程,这些线程都将相同的数据 N 次写入数据库。
我现在没有解决方案了。我很确定我的问题一定有解决方案,因为对我来说这看起来很微不足道,但我找不到。
如果您想了解更多上下文,请参阅我参考的项目的 repo。
尝试
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
asyncio.create_task(Gatherer().start_metering_daemon())
我正在使用 FastAPI 为分析系统构建异步后端。问题是它必须:a) 监听 API 呼叫并随时可用; b) 定期执行数据收集任务(解析数据并将其保存到数据库中)。
我写了这个函数作为守护进程:
async def start_metering_daemon(self) -> None:
"""sets a never ending task for metering"""
while True:
delay: int = self._get_delay() # delay in seconds until next execution
await asyncio.sleep(delay)
await self.gather_meterings() # perfom data gathering
我想要实现的是,当应用程序启动时,它还会将此守护程序函数添加到主事件循环中,并在有时间时执行它。但是,我一直无法找到适合任务规模的合适解决方案(添加 Celery 和类似的东西是一种矫枉过正)。
我尝试了以下方法来实现这一点,但 none 成功了:
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
await Gatherer().start_metering_daemon()
结果:由于线程被阻塞,服务器无法启动
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
fastapi.BackgroundTasks().add_task(Gatherer().start_metering_daemon)
结果:任务从未像日志中观察到的那样执行
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
fastapi.BackgroundTasks().add_task(asyncio.run, Gatherer().start_metering_daemon())
结果:与上一个相同
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
threading.Thread(target=asyncio.run, args=(Gatherer().start_metering_daemon(),)).start()
结果:这个有效,但是 a) 没有意义; b) 为 N 个 Uvicorn worker 生成 N 个相同的线程,这些线程都将相同的数据 N 次写入数据库。
我现在没有解决方案了。我很确定我的问题一定有解决方案,因为对我来说这看起来很微不足道,但我找不到。
如果您想了解更多上下文,请参阅我参考的项目的 repo。
尝试
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
asyncio.create_task(Gatherer().start_metering_daemon())