按顺序执行期货集合
Executing a collection of futures sequentially
我有一组期货,我想将它们组合成一个单一的期货,让它们按顺序执行。
我查看了 futures_ordered
函数。似乎 return 结果是按顺序执行的,但期货是同时执行的。
我试过fold
the futures, combining them with and_then
。然而,这对类型系统来说很棘手。
let tasks = vec![ok(()), ok(()), ok(())];
let combined_task = tasks.into_iter().fold(
ok(()), // seed
|acc, task| acc.and_then(|_| task), // accumulator
);
这会产生以下错误:
error[E0308]: mismatched types
--> src/main.rs:10:21
|
10 | |acc, task| acc.and_then(|_| task), // accumulator
| ^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `futures::AndThen`
|
= note: expected type `futures::FutureResult<_, _>`
found type `futures::AndThen<futures::FutureResult<_, _>, futures::FutureResult<(), _>, [closure@src/main.rs:10:34: 10:42 task:_]>`
我可能处理错了,但我 运行 没主意了。
Stream
有一个函数 buffered
允许您限制同时轮询的期货数量。
如果您有一系列期货,您可以创建一个流并使用 buffered
,如下所示:
let tasks = vec![future1, future2];
let stream = ::futures::stream::iter_ok(tasks);
let mut when_result_ready = stream.buffered(1);
when_result_ready
现在将是一个 Stream
实现,一次只轮询一个未来,并在每个未来完成后移动到下一个。
更新
根据评论和分析,buffered
的开销很大,因此另一种解决方案是将每个 Future
转换为 Stream
和 flatten
:
iter_ok(tasks).map(|f|f.into_stream()).flatten()
flatten
声明 "each individual stream will get exhausted before moving on to the next." 表示在前一个完成之前不会轮询 Future
。在我的本地分析中,这似乎比 buffered
方法快 80%。
我上面的两个答案都会产生 Stream
个结果,其中每个来源 Future
被顺序轮询并返回结果。提问者实际要求的只是最后一个 Future
而不是每个来源的结果 Future
,如果是这样的话,Stefan 的答案可能更有用并且证明有更好的性能。
作为,你的类型太具体了。
您可以设想 fold
的实现方式如下:
let (task0, task1, task2) = (ok(()), ok(()), ok(()));
let mut combined_task = ok(()); // seed
combined_task = combined_task.and_then(|_| task0);
combined_task = combined_task.and_then(|_| task1);
combined_task = combined_task.and_then(|_| task2);
变量 combined_task
需要更新为相同类型 的新值 。由于我们从 ok(())
开始,这就是每个步骤需要 return 的类型。但是,and_then
的return类型不同;这是一个 AndThen
。事实上,AndThen
是一个包含闭包和底层 future 的泛型类型,因此每一步都会产生一个可能具有不同大小的不同类型:
FutureResult<()>
AndThen<FutureResult<()>, closure0>
AndThen<AndThen<FutureResult<()>, closure0>, closure1>
AndThen<AndThen<AndThen<FutureResult<()>, closure0>, closure1>, closure2>
相反,您可以通过在每个步骤中生成 盒装特征对象 来创建统一类型:
let (task0, task1, task2) = (ok(()), ok(()), ok(()));
let mut combined_task: Box<Future<Item = (), Error = ()>> = Box::new(ok(())); // seed
combined_task = Box::new(combined_task.and_then(|_| task0));
combined_task = Box::new(combined_task.and_then(|_| task1));
combined_task = Box::new(combined_task.and_then(|_| task2));
Box<Future<Item = (), Error = ()>>
Box<Future<Item = (), Error = ()>>
Box<Future<Item = (), Error = ()>>
Box<Future<Item = (), Error = ()>>
转换回 fold
语法:
let combined_task: Box<Future<Item = (), Error = ()>> =
tasks.into_iter().fold(Box::new(ok(())), |acc, task| {
Box::new(acc.and_then(|_| task))
});
另请参阅:
合并iter_ok
and Stream::for_each
:
use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;
let tasks = vec![ok(()), ok(()), ok(())];
let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);
iter_ok
生成传递项的流,并且从不抛出错误(这就是为什么有时需要修复错误类型的原因)。传递给 for_each
的闭包然后 returns a Future
为每个项目 运行 - 这里只是传入的项目。
for_each
然后驱动每个 returned 未来完成,然后再移动到下一个,就像你想要的那样。它还会在遇到第一个错误时中止,并要求内部期货 return ()
成功。
for_each
本身 return 是一个 Future
将失败(如上所述)或 return ()
完成。
test tests::bench_variant_buffered ... bench: 22,356 ns/iter (+/- 1,816)
test tests::bench_variant_boxed ... bench: 8,575 ns/iter (+/- 1,042)
test tests::bench_variant_for_each ... bench: 4,070 ns/iter (+/- 531)
当我需要这样的东西时(主要是因为我正在调试一个问题)我最终写了一个 seq
组合器组成 loop_fn
like so:
fn seq<I>(
i: I,
) -> impl Future<Item = Vec<<I::Item as IntoFuture>::Item>, Error = <I::Item as IntoFuture>::Error>
where
I: IntoIterator,
I::Item: IntoFuture,
{
let iter = i.into_iter();
loop_fn((vec![], iter), |(mut output, mut iter)| {
let fut = if let Some(next) = iter.next() {
Either::A(next.into_future().map(|v| Some(v)))
} else {
Either::B(future::ok(None))
};
fut.and_then(move |val| {
if let Some(val) = val {
output.push(val);
Ok(Loop::Continue((output, iter)))
} else {
Ok(Loop::Break(output))
}
})
})
}
就我而言(稳定 async/await
)这段代码非常有用:
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
let data = vec![1,2,3];
stream::iter(data).for_each(|id| async move {
let request = async { id }; // async io request
let res = request.await;
println!("res: {:?}", res);
()
}).await;
Ok(())
}
我有一组期货,我想将它们组合成一个单一的期货,让它们按顺序执行。
我查看了 futures_ordered
函数。似乎 return 结果是按顺序执行的,但期货是同时执行的。
我试过fold
the futures, combining them with and_then
。然而,这对类型系统来说很棘手。
let tasks = vec![ok(()), ok(()), ok(())];
let combined_task = tasks.into_iter().fold(
ok(()), // seed
|acc, task| acc.and_then(|_| task), // accumulator
);
这会产生以下错误:
error[E0308]: mismatched types
--> src/main.rs:10:21
|
10 | |acc, task| acc.and_then(|_| task), // accumulator
| ^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `futures::AndThen`
|
= note: expected type `futures::FutureResult<_, _>`
found type `futures::AndThen<futures::FutureResult<_, _>, futures::FutureResult<(), _>, [closure@src/main.rs:10:34: 10:42 task:_]>`
我可能处理错了,但我 运行 没主意了。
Stream
有一个函数 buffered
允许您限制同时轮询的期货数量。
如果您有一系列期货,您可以创建一个流并使用 buffered
,如下所示:
let tasks = vec![future1, future2];
let stream = ::futures::stream::iter_ok(tasks);
let mut when_result_ready = stream.buffered(1);
when_result_ready
现在将是一个 Stream
实现,一次只轮询一个未来,并在每个未来完成后移动到下一个。
更新
根据评论和分析,buffered
的开销很大,因此另一种解决方案是将每个 Future
转换为 Stream
和 flatten
:
iter_ok(tasks).map(|f|f.into_stream()).flatten()
flatten
声明 "each individual stream will get exhausted before moving on to the next." 表示在前一个完成之前不会轮询 Future
。在我的本地分析中,这似乎比 buffered
方法快 80%。
我上面的两个答案都会产生 Stream
个结果,其中每个来源 Future
被顺序轮询并返回结果。提问者实际要求的只是最后一个 Future
而不是每个来源的结果 Future
,如果是这样的话,Stefan 的答案可能更有用并且证明有更好的性能。
作为
您可以设想 fold
的实现方式如下:
let (task0, task1, task2) = (ok(()), ok(()), ok(()));
let mut combined_task = ok(()); // seed
combined_task = combined_task.and_then(|_| task0);
combined_task = combined_task.and_then(|_| task1);
combined_task = combined_task.and_then(|_| task2);
变量 combined_task
需要更新为相同类型 的新值 。由于我们从 ok(())
开始,这就是每个步骤需要 return 的类型。但是,and_then
的return类型不同;这是一个 AndThen
。事实上,AndThen
是一个包含闭包和底层 future 的泛型类型,因此每一步都会产生一个可能具有不同大小的不同类型:
FutureResult<()>
AndThen<FutureResult<()>, closure0>
AndThen<AndThen<FutureResult<()>, closure0>, closure1>
AndThen<AndThen<AndThen<FutureResult<()>, closure0>, closure1>, closure2>
相反,您可以通过在每个步骤中生成 盒装特征对象 来创建统一类型:
let (task0, task1, task2) = (ok(()), ok(()), ok(()));
let mut combined_task: Box<Future<Item = (), Error = ()>> = Box::new(ok(())); // seed
combined_task = Box::new(combined_task.and_then(|_| task0));
combined_task = Box::new(combined_task.and_then(|_| task1));
combined_task = Box::new(combined_task.and_then(|_| task2));
Box<Future<Item = (), Error = ()>>
Box<Future<Item = (), Error = ()>>
Box<Future<Item = (), Error = ()>>
Box<Future<Item = (), Error = ()>>
转换回 fold
语法:
let combined_task: Box<Future<Item = (), Error = ()>> =
tasks.into_iter().fold(Box::new(ok(())), |acc, task| {
Box::new(acc.and_then(|_| task))
});
另请参阅:
合并iter_ok
and Stream::for_each
:
use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;
let tasks = vec![ok(()), ok(()), ok(())];
let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);
iter_ok
生成传递项的流,并且从不抛出错误(这就是为什么有时需要修复错误类型的原因)。传递给 for_each
的闭包然后 returns a Future
为每个项目 运行 - 这里只是传入的项目。
for_each
然后驱动每个 returned 未来完成,然后再移动到下一个,就像你想要的那样。它还会在遇到第一个错误时中止,并要求内部期货 return ()
成功。
for_each
本身 return 是一个 Future
将失败(如上所述)或 return ()
完成。
test tests::bench_variant_buffered ... bench: 22,356 ns/iter (+/- 1,816)
test tests::bench_variant_boxed ... bench: 8,575 ns/iter (+/- 1,042)
test tests::bench_variant_for_each ... bench: 4,070 ns/iter (+/- 531)
当我需要这样的东西时(主要是因为我正在调试一个问题)我最终写了一个 seq
组合器组成 loop_fn
like so:
fn seq<I>(
i: I,
) -> impl Future<Item = Vec<<I::Item as IntoFuture>::Item>, Error = <I::Item as IntoFuture>::Error>
where
I: IntoIterator,
I::Item: IntoFuture,
{
let iter = i.into_iter();
loop_fn((vec![], iter), |(mut output, mut iter)| {
let fut = if let Some(next) = iter.next() {
Either::A(next.into_future().map(|v| Some(v)))
} else {
Either::B(future::ok(None))
};
fut.and_then(move |val| {
if let Some(val) = val {
output.push(val);
Ok(Loop::Continue((output, iter)))
} else {
Ok(Loop::Break(output))
}
})
})
}
就我而言(稳定 async/await
)这段代码非常有用:
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
let data = vec![1,2,3];
stream::iter(data).for_each(|id| async move {
let request = async { id }; // async io request
let res = request.await;
println!("res: {:?}", res);
()
}).await;
Ok(())
}