运行 FastApi 中的 Asyncio 子进程导致 NotImplementedError

Running an Asyncio Subprocess in FastApi results in NotImplementedError

我正在尝试在 FastApi 路由中 运行 Subprocess,但执行结果为 NotImplementedError。我读过关于这个问题的类似问题:

Asyncio.create_subprocess_exec NotImplementedError - Fastapi Background Task

但他们似乎没有任何可行的解决方案。

我的 FastApi 路由是这样的:

@app.get("/test/subprocess")
async def subprocess_test():
    parsed_json = await(probe_video_file(Path(r"G:\ffmpeg testing\ffmpeg\ffprobe.exe"),
                                         Path(r"G:\ffmpeg testing\input_file\test_file.wmv")))
    print(parsed_json)
    return parsed_json

当我导航到这条路线时,出现了异常并且代码崩溃了。 probe_video_file 函数内部有一个子进程调用,如下所示:

async def probe_video_file(ffprobe_path: Path, file_to_probe: Path) -> dict:
    """
    Probes a video file with FFprobe.
    :param ffprobe_path: Path to ffprobe executable
    :param file_to_probe: Path to file to probe
    :returns Parsed JSON dict of the output
    """
    args = [f'{str(ffprobe_path)}', f'{str(file_to_probe)}']
    args += ["-hide_banner", "-loglevel", "fatal", "-show_error", "-show_format", "-show_streams", "-show_chapters",
             "-show_private_data", "-print_format", "json"]

    #sub process is here, and here is where the exception happens.
    proc = await asyncio.create_subprocess_exec(
        *args,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')
        raise Exception(f"Failed to probe file {file_to_probe}")

    return json.loads(stdout)

当我 运行 probe_video_file 没有 FastAPI 时,像这样:

async def main():
    json_output = await probe_video_file(Path(r"G:\ffmpeg testing\ffmpeg\ffprobe.exe"),
                                         Path(r"G:\ffmpeg testing\input_file\test_file.wmv"))
    print(json_output)


if __name__ == '__main__':
    asyncio.run(main())

它 运行 很好并打印出正确的输出:

[['G:\ffmpeg testing\ffmpeg\ffprobe.exe', 'G:\ffmpeg testing\input_file\test_file.wmv', '-hide_banner', '-loglevel', 'fatal', '-show_error', '-show_format', '-show_streams', '-show_chapters', '-show_private_data', '-print_format', 'json'] exited with 0]
[stdout]
{
    "streams": [
        {
            "index": 0,
            "codec_name": "wmav2",
            "codec_long_name": "Windows Media Audio 2",
            "codec_type": "audio",
            "codec_tag_string": "a[1][0][0]",
            "codec_tag": "0x0161",
            "sample_fmt": "fltp",
            "sample_rate": "48000",
            "channels": 2,
            "bits_per_sample": 0,
            "r_frame_rate": "0/0",
            "avg_frame_rate": "0/0",
            "time_base": "1/1000",
            "start_pts": 0,
            "start_time": "0.000000",
            "duration_ts": 2155050,
            "duration": "2155.050000",
            "bit_rate": "96000",
            "disposition": {
                "default": 0,
                "dub": 0,
                "original": 0,
                "comment": 0,
                "lyrics": 0,
                "karaoke": 0,
                "forced": 0,
                "hearing_impaired": 0,
                "visual_impaired": 0,
                "clean_effects": 0,
                "attached_pic": 0,
                "timed_thumbnails": 0
            },
            "tags": {
                "language": "eng"
            }
        },
        {
            "index": 1,
            "codec_name": "wmv3",
            "codec_long_name": "Windows Media Video 9",
            "profile": "Main",
            "codec_type": "video",
            "codec_tag_string": "WMV3",
            "codec_tag": "0x33564d57",
            "width": 850,
            "height": 480,
            "coded_width": 850,
            "coded_height": 480,
            "closed_captions": 0,
            "has_b_frames": 0,
            "pix_fmt": "yuv420p",
            "level": -99,
            "chroma_location": "left",
            "refs": 1,
            "r_frame_rate": "30000/1001",
            "avg_frame_rate": "30000/1001",
            "time_base": "1/1000",
            "start_pts": 0,
            "start_time": "0.000000",
            "duration_ts": 2155050,
            "duration": "2155.050000",
            "bit_rate": "2000000",
            "disposition": {
                "default": 0,
                "dub": 0,
                "original": 0,
                "comment": 0,
                "lyrics": 0,
                "karaoke": 0,
                "forced": 0,
                "hearing_impaired": 0,
                "visual_impaired": 0,
                "clean_effects": 0,
                "attached_pic": 0,
                "timed_thumbnails": 0
            },
            "tags": {
                "language": "eng"
            }
        }
    ],
    "chapters": [

    ],
    "format": {
        "filename": "G:\ffmpeg testing\input_file\test_file.wmv",
        "nb_streams": 2,
        "nb_programs": 0,
        "format_name": "asf",
        "format_long_name": "ASF (Advanced / Active Streaming Format)",
        "start_time": "0.000000",
        "duration": "2155.050000",
        "size": "567194391",
        "bit_rate": "2105545",
        "probe_score": 100,
        "tags": {
            "WMFSDKNeeded": "0.0.0.0000",
            "DeviceConformanceTemplate": "MP@HL",
            "WMFSDKVersion": "11.0.5721.5265",
            "IsVBR": "0"
        }
    }
}

{'streams': [{'index': 0, 'codec_name': 'wmav2', 'codec_long_name': 'Windows Media Audio 2', 'codec_type': 'audio', 'codec_tag_string': 'a[1][0][0]', 'codec_tag': '0x0161', 'sample_fmt': 'fltp', 'sample_rate': '48000', 'channels': 2, 'bits_per_sample': 0, 'r_frame_rate': '0/0', 'avg_frame_rate': '0/0', 'time_base': '1/1000', 'start_pts': 0, 'start_time': '0.000000', 'duration_ts': 2155050, 'duration': '2155.050000', 'bit_rate': '96000', 'disposition': {'default': 0, 'dub': 0, 'original': 0, 'comment': 0, 'lyrics': 0, 'karaoke': 0, 'forced': 0, 'hearing_impaired': 0, 'visual_impaired': 0, 'clean_effects': 0, 'attached_pic': 0, 'timed_thumbnails': 0}, 'tags': {'language': 'eng'}}, {'index': 1, 'codec_name': 'wmv3', 'codec_long_name': 'Windows Media Video 9', 'profile': 'Main', 'codec_type': 'video', 'codec_tag_string': 'WMV3', 'codec_tag': '0x33564d57', 'width': 850, 'height': 480, 'coded_width': 850, 'coded_height': 480, 'closed_captions': 0, 'has_b_frames': 0, 'pix_fmt': 'yuv420p', 'level': -99, 'chroma_location': 'left', 'refs': 1, 'r_frame_rate': '30000/1001', 'avg_frame_rate': '30000/1001', 'time_base': '1/1000', 'start_pts': 0, 'start_time': '0.000000', 'duration_ts': 2155050, 'duration': '2155.050000', 'bit_rate': '2000000', 'disposition': {'default': 0, 'dub': 0, 'original': 0, 'comment': 0, 'lyrics': 0, 'karaoke': 0, 'forced': 0, 'hearing_impaired': 0, 'visual_impaired': 0, 'clean_effects': 0, 'attached_pic': 0, 'timed_thumbnails': 0}, 'tags': {'language': 'eng'}}], 'chapters': [], 'format': {'filename': 'G:\ffmpeg testing\input_file\test_file.wmv', 'nb_streams': 2, 'nb_programs': 0, 'format_name': 'asf', 'format_long_name': 'ASF (Advanced / Active Streaming Format)', 'start_time': '0.000000', 'duration': '2155.050000', 'size': '567194391', 'bit_rate': '2105545', 'probe_score': 100, 'tags': {'WMFSDKNeeded': '0.0.0.0000', 'DeviceConformanceTemplate': 'MP@HL', 'WMFSDKVersion': '11.0.5721.5265', 'IsVBR': '0'}}}

Process finished with exit code 0

我已经尝试 'setting' 事件循环到 ProactorEventLoop 就像其他问题中建议的那样:

@app.on_event("startup")
async def startup_event():
    """Code runs at startup..."""
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)

但是没有效果。

这是异常回溯:

ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "E:\pycharm\my_project\venv\lib\site-packages\uvicorn\protocols\http\httptools_impl.py", line 375, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "E:\pycharm\my_project\venv\lib\site-packages\uvicorn\middleware\proxy_headers.py", line 75, in __call__
    return await self.app(scope, receive, send)
  File "E:\pycharm\my_project\venv\lib\site-packages\fastapi\applications.py", line 208, in __call__
    await super().__call__(scope, receive, send)
  File "E:\pycharm\my_project\venv\lib\site-packages\starlette\applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "E:\pycharm\my_project\venv\lib\site-packages\starlette\middleware\errors.py", line 181, in __call__
    raise exc
  File "E:\pycharm\my_project\venv\lib\site-packages\starlette\middleware\errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "E:\pycharm\my_project\venv\lib\site-packages\starlette\middleware\cors.py", line 84, in __call__
    await self.app(scope, receive, send)
  File "E:\pycharm\my_project\venv\lib\site-packages\starlette\exceptions.py", line 82, in __call__
    raise exc
  File "E:\pycharm\my_project\venv\lib\site-packages\starlette\exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "E:\pycharm\my_project\venv\lib\site-packages\starlette\routing.py", line 656, in __call__
    await route.handle(scope, receive, send)
  File "E:\pycharm\my_project\venv\lib\site-packages\starlette\routing.py", line 259, in handle
    await self.app(scope, receive, send)
  File "E:\pycharm\my_project\venv\lib\site-packages\starlette\routing.py", line 61, in app
    response = await func(request)
  File "E:\pycharm\my_project\venv\lib\site-packages\fastapi\routing.py", line 226, in app
    raw_response = await run_endpoint_function(
  File "E:\pycharm\my_project\venv\lib\site-packages\fastapi\routing.py", line 159, in run_endpoint_function
    return await dependant.call(**values)
  File "E:\pycharm\my_project\src\backend\app\api\api.py", line 202, in subprocess_test
    parsed_json = await(probe_video_file(Path(r"G:\ffmpeg testing\ffmpeg\ffprobe.exe"),
  File "E:\pycharm\my_project\src\backend\app\ffmpeg\ffprobe.py", line 26, in probe_video_file
    proc = await asyncio.create_subprocess_exec(
  File "C:\Users\user\AppData\Local\Programs\Python\Python310\lib\asyncio\subprocess.py", line 218, in create_subprocess_exec
    transport, protocol = await loop.subprocess_exec(
  File "C:\Users\user\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 1652, in subprocess_exec
    transport = await self._make_subprocess_transport(
  File "C:\Users\user\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 493, in _make_subprocess_transport
    raise NotImplementedError
NotImplementedError
INFO:     127.0.0.1:58576 - "GET /test/subprocess HTTP/1.1" 500 Internal Server Error

有人知道怎么解决吗?据我了解,这与不支持此功能的 FastApi 的默认事件循环有关。但是我不知道如何使用 Asyncio 的默认事件循环设置或替换默认的 FastApi 事件循环。

编辑:

我认为重要的是要指出我使用的是 windows 10.

仔细查看 uvicorn 代码,似乎 reload=True 选项导致默认 ProactorEventLoop 在 windows 上更改为 SelectorEventLoop。您可以在没有 reload=True 的情况下尝试它,看看它是否有效(我现在无法测试)。但是如果你需要在 uvicorn 中强制使用特定的事件循环,你可以像这样子类化 uvicorn.Server

import asyncio
from asyncio.windows_events import ProactorEventLoop

from fastapi import FastAPI
from uvicorn import Config, Server

app = FastAPI()


class ProactorServer(Server):
    def run(self, sockets=None):
        loop = ProactorEventLoop()
        asyncio.set_event_loop(loop) # since this is the default in Python 3.10, explicit selection can also be omitted
        asyncio.run(self.serve(sockets=sockets))


config = Config(app=app, host="0.0.0.0", port=8000, reload=True)
server = ProactorServer(config=config)
server.run()

这应该允许您在 FastAPI 端点内使用 asyncio.create_subprocess_exec()