如何将 futures::Stream 写入磁盘而不先将其完全存储在内存中?
How do I write a futures::Stream to disk without storing it entirely in memory first?
这里有一个使用 Rusoto S3 下载文件的示例:
问题是它看起来像是将整个文件下载到内存中,然后将其写入磁盘,因为它使用 write_all
method which takes an array of bytes, not a stream. How can I use the StreamingBody
, which implements futures::Stream
将文件流式传输到磁盘?
由于 StreamingBody
实现了 Stream<Item = Vec<u8>, Error = Error>
,我们可以构造一个 MCVE 来表示:
extern crate futures; // 0.1.25
use futures::{prelude::*, stream};
type Error = Box<std::error::Error>;
fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> {
const DUMMY_DATA: &[&[u8]] = &[b"0123", b"4567", b"89AB", b"CDEF"];
let iter_of_owned_bytes = DUMMY_DATA.iter().map(|&b| b.to_owned());
stream::iter_ok(iter_of_owned_bytes)
}
然后我们可以通过某种方式得到一个"streaming body"并使用Stream::for_each
来处理Stream
中的每个元素。在这里,我们只是用一些提供的输出位置调用 write_all
:
use std::{fs::File, io::Write};
fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> {
streaming_body().for_each(move |chunk| file.write_all(&chunk).map_err(Into::into))
}
然后我们可以写一些测试 main:
fn main() {
let mut file = Vec::new();
{
let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");
}
assert_eq!(file, b"0123456789ABCDEF");
}
关于这个简单实现的质量的重要说明:
对 write_all
的调用可能会阻塞,您不应该在异步程序中这样做。最好将阻塞工作交给线程池。
Future::wait
的用法强制线程阻塞,直到未来完成,这对测试非常有用,但对于您的实际用例可能不正确。
另请参阅:
这里有一个使用 Rusoto S3 下载文件的示例:
问题是它看起来像是将整个文件下载到内存中,然后将其写入磁盘,因为它使用 write_all
method which takes an array of bytes, not a stream. How can I use the StreamingBody
, which implements futures::Stream
将文件流式传输到磁盘?
由于 StreamingBody
实现了 Stream<Item = Vec<u8>, Error = Error>
,我们可以构造一个 MCVE 来表示:
extern crate futures; // 0.1.25
use futures::{prelude::*, stream};
type Error = Box<std::error::Error>;
fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> {
const DUMMY_DATA: &[&[u8]] = &[b"0123", b"4567", b"89AB", b"CDEF"];
let iter_of_owned_bytes = DUMMY_DATA.iter().map(|&b| b.to_owned());
stream::iter_ok(iter_of_owned_bytes)
}
然后我们可以通过某种方式得到一个"streaming body"并使用Stream::for_each
来处理Stream
中的每个元素。在这里,我们只是用一些提供的输出位置调用 write_all
:
use std::{fs::File, io::Write};
fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> {
streaming_body().for_each(move |chunk| file.write_all(&chunk).map_err(Into::into))
}
然后我们可以写一些测试 main:
fn main() {
let mut file = Vec::new();
{
let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");
}
assert_eq!(file, b"0123456789ABCDEF");
}
关于这个简单实现的质量的重要说明:
对
write_all
的调用可能会阻塞,您不应该在异步程序中这样做。最好将阻塞工作交给线程池。Future::wait
的用法强制线程阻塞,直到未来完成,这对测试非常有用,但对于您的实际用例可能不正确。
另请参阅: