如何将 futures::Stream 写入磁盘而不先将其完全存储在内存中?

How do I write a futures::Stream to disk without storing it entirely in memory first?

这里有一个使用 Rusoto S3 下载文件的示例:

问题是它看起来像是将整个文件下载到内存中,然后将其写入磁盘,因为它使用 write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream 将文件流式传输到磁盘?

由于 StreamingBody 实现了 Stream<Item = Vec<u8>, Error = Error>,我们可以构造一个 MCVE 来表示:

extern crate futures; // 0.1.25

use futures::{prelude::*, stream};

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> {
    const DUMMY_DATA: &[&[u8]] = &[b"0123", b"4567", b"89AB", b"CDEF"];
    let iter_of_owned_bytes = DUMMY_DATA.iter().map(|&b| b.to_owned());
    stream::iter_ok(iter_of_owned_bytes)
}

然后我们可以通过某种方式得到一个"streaming body"并使用Stream::for_each来处理Stream中的每个元素。在这里,我们只是用一些提供的输出位置调用 write_all

use std::{fs::File, io::Write};

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> {
    streaming_body().for_each(move |chunk| file.write_all(&chunk).map_err(Into::into))
}

然后我们可以写一些测试 main:

fn main() {
    let mut file = Vec::new();

    {
        let fut = save_to_disk(&mut file);
        fut.wait().expect("Could not drive future");
    }

    assert_eq!(file, b"0123456789ABCDEF");
}

关于这个简单实现的质量的重要说明:

  1. write_all 的调用可能会阻塞,您不应该在异步程序中这样做。最好将阻塞工作交给线程池。

  2. Future::wait 的用法强制线程阻塞,直到未来完成,这对测试非常有用,但对于您的实际用例可能不正确。

另请参阅: