HTTP服务器启动后台python脚本无阻塞
HTTP server kick-off background python script without blocking
我希望能够通过 Web 请求以简单的方式触发长 运行ning python 脚本。此外,我希望能够在初始副本仍 运行ning.
时触发具有不同参数的脚本的其他副本
我研究了 flask、aiohttp 和排队的可能性。 Flask 和 aiohttp 的设置开销似乎最少。我计划通过 subprocess.run 执行现有的 python 脚本(但是,我确实考虑过将脚本重构为可用于 Web 响应功能的库)。
使用 aiohttp,我正在尝试类似的操作:
ingestion_service.py:
from aiohttp import web
from pprint import pprint
routes = web.RouteTableDef()
@routes.get("/ingest_pipeline")
async def test_ingest_pipeline(request):
'''
Get the job_conf specified from the request and activate the script
'''
#subprocess.run the command with lookup of job conf file
response = web.Response(text=f"Received data ingestion request")
await response.prepare(request)
await response.write_eof()
#eventually this would be subprocess.run call
time.sleep(80)
return response
def init_func(argv):
app = web.Application()
app.add_routes(routes)
return app
但是虽然初始请求 returns 立即执行,但后续请求会阻塞,直到初始请求完成。我正在 运行 通过以下方式安装服务器:
python -m aiohttp.web -H localhost -P 8080 ingestion_service:init_func
我知道多线程和并发可能提供比 asyncio 更好的解决方案。在这种情况下,我不是在寻找一个强大的解决方案,只是让我可以通过 http 请求一次 运行 多个脚本,理想情况下内存成本最低。
好的,我在做的事情有几个问题。也就是说,time.sleep() 是阻塞的,所以应该使用 asyncio.sleep()。但是,由于我对生成子进程感兴趣,我可以使用 asyncio.subprocess 以非阻塞方式执行此操作。
注意:
https://docs.python.org/3/library/asyncio-subprocess.html.
使用这些帮助,但终止子进程的网络处理程序仍然存在问题。幸运的是,这里有一个解决方案:
https://docs.aiohttp.org/en/stable/web_advanced.html
aiojobs 有一个装饰器 "atomic" 可以保护进程直到它完成。因此,这些行的代码将起作用:
from aiojobs.aiohttp import setup, atomic
import asyncio
import os
from aiohttp import web
@atomic
async def ingest_pipeline(request):
#be careful what you pass through to shell, lest you
#give away the keys to the kingdom
shell_command = "[your command here]"
response_text = f"running {shell_command}"
response_code = 200
response = web.Response(text=response_text, status=response_code)
await response.prepare(request)
await response.write_eof()
ingestion_process = await asyncio.create_subprocess_shell(shell_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await ingestion_process.communicate()
return response
def init_func(argv):
app = web.Application()
setup(app)
app.router.add_get('/ingest_pipeline', ingest_pipeline)
return app
这是非常简单的框架,但可能会帮助其他人寻找临时内部解决方案的快速框架。
我希望能够通过 Web 请求以简单的方式触发长 运行ning python 脚本。此外,我希望能够在初始副本仍 运行ning.
时触发具有不同参数的脚本的其他副本我研究了 flask、aiohttp 和排队的可能性。 Flask 和 aiohttp 的设置开销似乎最少。我计划通过 subprocess.run 执行现有的 python 脚本(但是,我确实考虑过将脚本重构为可用于 Web 响应功能的库)。
使用 aiohttp,我正在尝试类似的操作: ingestion_service.py:
from aiohttp import web
from pprint import pprint
routes = web.RouteTableDef()
@routes.get("/ingest_pipeline")
async def test_ingest_pipeline(request):
'''
Get the job_conf specified from the request and activate the script
'''
#subprocess.run the command with lookup of job conf file
response = web.Response(text=f"Received data ingestion request")
await response.prepare(request)
await response.write_eof()
#eventually this would be subprocess.run call
time.sleep(80)
return response
def init_func(argv):
app = web.Application()
app.add_routes(routes)
return app
但是虽然初始请求 returns 立即执行,但后续请求会阻塞,直到初始请求完成。我正在 运行 通过以下方式安装服务器:
python -m aiohttp.web -H localhost -P 8080 ingestion_service:init_func
我知道多线程和并发可能提供比 asyncio 更好的解决方案。在这种情况下,我不是在寻找一个强大的解决方案,只是让我可以通过 http 请求一次 运行 多个脚本,理想情况下内存成本最低。
好的,我在做的事情有几个问题。也就是说,time.sleep() 是阻塞的,所以应该使用 asyncio.sleep()。但是,由于我对生成子进程感兴趣,我可以使用 asyncio.subprocess 以非阻塞方式执行此操作。
注意:
使用这些帮助,但终止子进程的网络处理程序仍然存在问题。幸运的是,这里有一个解决方案: https://docs.aiohttp.org/en/stable/web_advanced.html
aiojobs 有一个装饰器 "atomic" 可以保护进程直到它完成。因此,这些行的代码将起作用:
from aiojobs.aiohttp import setup, atomic
import asyncio
import os
from aiohttp import web
@atomic
async def ingest_pipeline(request):
#be careful what you pass through to shell, lest you
#give away the keys to the kingdom
shell_command = "[your command here]"
response_text = f"running {shell_command}"
response_code = 200
response = web.Response(text=response_text, status=response_code)
await response.prepare(request)
await response.write_eof()
ingestion_process = await asyncio.create_subprocess_shell(shell_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await ingestion_process.communicate()
return response
def init_func(argv):
app = web.Application()
setup(app)
app.router.add_get('/ingest_pipeline', ingest_pipeline)
return app
这是非常简单的框架,但可能会帮助其他人寻找临时内部解决方案的快速框架。