我如何使用电机的 open_download_stream 与 FastAPI 的 StreamingResponse 一起工作?

How can I use motor's open_download_stream work with FastAPI's StreamingResponse?

我正在构建一个 FastAPI 端点,Web 客户端用户基本上可以下载存储在 MongoDB 中作为 GridFS 块的文件。但是,FastAPI 的 StreamingResponse doesn't take the supposedly file-like AsyncIOMotorGridOut object returned by motor's open_download_stream 方法。

我已经有了一个端点,它可以获取表单中的文件并将它们上传到 MongoDB。我希望类似的下载帮助函数像这样简单:

async def upload_file(db, file: UploadFile):
    """ Uploads file to MongoDB GridFS file system and returns ID to be stored with collection document """
    fs = AsyncIOMotorGridFSBucket(db)
    file_id = await fs.upload_from_stream(
        file.filename,
        file.file,
        # chunk_size_bytes=255*1024*1024, #default 255kB
        metadata={"contentType": file.content_type})
    return file_id

我的第一次尝试是使用这样的助手:

async def download_file(db, file_id):
    """Returns  AsyncIOMotorGridOut (non-iterable file-like object)"""
    fs = AsyncIOMotorGridFSBucket(db)
    stream = await fs.open_download_stream(file_id)
    # return download_streamer(stream)
    return stream

我的 FastAPI 端点如下所示:

app.get("/file/{file_id}")
async def get_file(file_id):
    file = await download_file(db, file_id)
    return StreamingResponse(file, media_type=file.content_type)

尝试下载具有有效 file_id 的文件时出现此错误:TypeError: 'AsyncIOMotorGridOut' object is not an iterator

我的第二次尝试是让生成器迭代文件块:

async def download_streamer(file: AsyncIOMotorGridOut):
    """ Returns generator file-like object to be served by StreamingResponse
    https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
    """
    chunk_size = 255*1024*1024
    for chunk in await file.readchunk():
        print(f"chunk: {chunk}")
        yield chunk

然后我在 download_file 助手中使用注释 return download_streamer(stream),但由于某种原因,每个块只是 255 的整数。

使用 motor 从 MongoDB 中获取文件并将其作为 FastAPI Web 响应流式传输而不使用临时文件的最佳方法是什么? (我无权访问硬盘驱动器,我不想将整个文件存储在内存中 - 我只想通过 FastAPI 将文件从 MongoDB 流式传输到客户端,一次一个块)。

我的解决方案是根据 . Such an iterator works with the async variant of FastAPI's StreamingResponse, and reads one GridFS chunk at a time (defaults to 255KB per motor docs) using readchunk() 方法创建一个恰好在 Python 3.6+ 语法中的生成器。当使用 upload_from_stream() 将文件存储在 MongoDB 中时设置此块大小。一个可选的实现是使用 .read(n) 一次读取 n 个字节。我选择使用 readchunk(),因此在流期间一次恰好获取 1 个数据库文档(每个 GridFS 文件被分解成块并一次存储一个块在数据库中)

async def chunk_generator(grid_out):
    while True:
        # chunk = await grid_out.read(1024)
        chunk = await grid_out.readchunk()
        if not chunk:
            break
        yield chunk


async def download_file(db, file_id):
    """Returns iterator over AsyncIOMotorGridOut object"""
    fs = AsyncIOMotorGridFSBucket(db)
    grid_out = await fs.open_download_stream(file_id)
    return chunk_generator(grid_out)

未来的改进是 download_file() return 一个元组,以便不仅包括生成器,还包括 ContentType.

这样的元数据