HTTP服务器启动后台python脚本无阻塞

HTTP server kick-off background python script without blocking

我希望能够通过 Web 请求以简单的方式触发长 运行ning python 脚本。此外,我希望能够在初始副本仍 运行ning.

时触发具有不同参数的脚本的其他副本

我研究了 flask、aiohttp 和排队的可能性。 Flask 和 aiohttp 的设置开销似乎最少。我计划通过 subprocess.run 执行现有的 python 脚本(但是,我确实考虑过将脚本重构为可用于 Web 响应功能的库)。

使用 aiohttp,我正在尝试类似的操作: ingestion_service.py:

from aiohttp import web
from pprint import pprint
routes = web.RouteTableDef()

@routes.get("/ingest_pipeline")
async def test_ingest_pipeline(request):
    '''
    Get the job_conf specified from the request and activate the script
    '''
    #subprocess.run the command with lookup of job conf file
    response =  web.Response(text=f"Received data ingestion request")
    await response.prepare(request)
    await response.write_eof()

    #eventually this would be subprocess.run call
    time.sleep(80)

    return response

def init_func(argv):
    app = web.Application()
    app.add_routes(routes)
    return app

但是虽然初始请求 returns 立即执行,但后续请求会阻塞,直到初始请求完成。我正在 运行 通过以下方式安装服务器:

python -m aiohttp.web -H localhost -P 8080 ingestion_service:init_func

我知道多线程和并发可能提供比 asyncio 更好的解决方案。在这种情况下,我不是在寻找一个强大的解决方案,只是让我可以通过 http 请求一次 运行 多个脚本,理想情况下内存成本最低。

好的,我在做的事情有几个问题。也就是说,time.sleep() 是阻塞的,所以应该使用 asyncio.sleep()。但是,由于我对生成子进程感兴趣,我可以使用 asyncio.subprocess 以非阻塞方式执行此操作。 注意: https://docs.python.org/3/library/asyncio-subprocess.html.

使用这些帮助,但终止子进程的网络处理程序仍然存在问题。幸运的是,这里有一个解决方案: https://docs.aiohttp.org/en/stable/web_advanced.html

aiojobs 有一个装饰器 "atomic" 可以保护进程直到它完成。因此,这些行的代码将起作用:

from aiojobs.aiohttp import setup, atomic
import asyncio
import os

from aiohttp import web

@atomic
async def ingest_pipeline(request):
    #be careful what you pass through to shell, lest you
    #give away the keys to the kingdom
    shell_command = "[your command here]"
    response_text = f"running {shell_command}"
    response_code = 200
    response =  web.Response(text=response_text, status=response_code)
    await response.prepare(request)
    await response.write_eof()

    ingestion_process = await asyncio.create_subprocess_shell(shell_command,
                                                              stdout=asyncio.subprocess.PIPE,
                                                              stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await ingestion_process.communicate()
    return response

def init_func(argv):
    app = web.Application()
    setup(app)
    app.router.add_get('/ingest_pipeline', ingest_pipeline)
    return app

这是非常简单的框架,但可能会帮助其他人寻找临时内部解决方案的快速框架。