按顺序执行期货集合

Executing a collection of futures sequentially

我有一组期货,我想将它们组合成一个单一的期货,让它们按顺序执行。

我查看了 futures_ordered 函数。似乎 return 结果是按顺序执行的,但期货是同时执行的。

我试过fold the futures, combining them with and_then。然而,这对类型系统来说很棘手。

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = tasks.into_iter().fold(
    ok(()),                             // seed
    |acc, task| acc.and_then(|_| task), // accumulator
);

playground

这会产生以下错误:

error[E0308]: mismatched types
  --> src/main.rs:10:21
   |
10 |         |acc, task| acc.and_then(|_| task), // accumulator
   |                     ^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `futures::AndThen`
   |
   = note: expected type `futures::FutureResult<_, _>`
              found type `futures::AndThen<futures::FutureResult<_, _>, futures::FutureResult<(), _>, [closure@src/main.rs:10:34: 10:42 task:_]>`

我可能处理错了,但我 运行 没主意了。

Stream 有一个函数 buffered 允许您限制同时轮询的期货数量。

如果您有一系列期货,您可以创建一个流并使用 buffered,如下所示:

let tasks = vec![future1, future2];
let stream = ::futures::stream::iter_ok(tasks);
let mut when_result_ready = stream.buffered(1);

when_result_ready 现在将是一个 Stream 实现,一次只轮询一个未来,并在每个未来完成后移动到下一个。

更新

根据评论和分析,buffered 的开销很大,因此另一种解决方案是将每个 Future 转换为 Streamflatten

iter_ok(tasks).map(|f|f.into_stream()).flatten()

flatten 声明 "each individual stream will get exhausted before moving on to the next." 表示在前一个完成之前不会轮询 Future。在我的本地分析中,这似乎比 buffered 方法快 80%。


我上面的两个答案都会产生 Stream 个结果,其中每个来源 Future 被顺序轮询并返回结果。提问者实际要求的只是最后一个 Future 而不是每个来源的结果 Future,如果是这样的话,Stefan 的答案可能更有用并且证明有更好的性能。

作为,你的类型太具体了。

您可以设想 fold 的实现方式如下:

let (task0, task1, task2) = (ok(()), ok(()), ok(()));

let mut combined_task = ok(()); // seed
combined_task = combined_task.and_then(|_| task0); 
combined_task = combined_task.and_then(|_| task1); 
combined_task = combined_task.and_then(|_| task2); 

变量 combined_task 需要更新为相同类型 的新值 。由于我们从 ok(()) 开始,这就是每个步骤需要 return 的类型。但是,and_then的return类型不同;这是一个 AndThen。事实上,AndThen 是一个包含闭包和底层 future 的泛型类型,因此每一步都会产生一个可能具有不同大小的不同类型:

  1. FutureResult<()>
  2. AndThen<FutureResult<()>, closure0>
  3. AndThen<AndThen<FutureResult<()>, closure0>, closure1>
  4. AndThen<AndThen<AndThen<FutureResult<()>, closure0>, closure1>, closure2>

相反,您可以通过在每个步骤中生成 盒装特征对象 来创建统一类型:

let (task0, task1, task2) = (ok(()), ok(()), ok(()));

let mut combined_task: Box<Future<Item = (), Error = ()>> = Box::new(ok(())); // seed
combined_task = Box::new(combined_task.and_then(|_| task0)); 
combined_task = Box::new(combined_task.and_then(|_| task1)); 
combined_task = Box::new(combined_task.and_then(|_| task2)); 
  1. Box<Future<Item = (), Error = ()>>
  2. Box<Future<Item = (), Error = ()>>
  3. Box<Future<Item = (), Error = ()>>
  4. Box<Future<Item = (), Error = ()>>

转换回 fold 语法:

let combined_task: Box<Future<Item = (), Error = ()>> =
    tasks.into_iter().fold(Box::new(ok(())), |acc, task| {
        Box::new(acc.and_then(|_| task))
    });

另请参阅:

合并iter_ok and Stream::for_each:

use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);

iter_ok 生成传递项的流,并且从不抛出错误(这就是为什么有时需要修复错误类型的原因)。传递给 for_each 的闭包然后 returns a Future 为每个项目 运行 - 这里只是传入的项目。

for_each 然后驱动每个 returned 未来完成,然后再移动到下一个,就像你想要的那样。它还会在遇到第一个错误时中止,并要求内部期货 return () 成功。

for_each 本身 return 是一个 Future 将失败(如上所述)或 return () 完成。

test tests::bench_variant_buffered ... bench:      22,356 ns/iter (+/- 1,816)
test tests::bench_variant_boxed ...    bench:       8,575 ns/iter (+/- 1,042)
test tests::bench_variant_for_each ... bench:       4,070 ns/iter (+/- 531)

当我需要这样的东西时(主要是因为我正在调试一个问题)我最终写了一个 seq 组合器组成 loop_fn like so:

fn seq<I>(
    i: I,
) -> impl Future<Item = Vec<<I::Item as IntoFuture>::Item>, Error = <I::Item as IntoFuture>::Error>
where
    I: IntoIterator,
    I::Item: IntoFuture,
{
    let iter = i.into_iter();
    loop_fn((vec![], iter), |(mut output, mut iter)| {
        let fut = if let Some(next) = iter.next() {
            Either::A(next.into_future().map(|v| Some(v)))
        } else {
            Either::B(future::ok(None))
        };

        fut.and_then(move |val| {
            if let Some(val) = val {
                output.push(val);
                Ok(Loop::Continue((output, iter)))
            } else {
                Ok(Loop::Break(output))
            }
        })
    })
}

就我而言(稳定 async/await)这段代码非常有用:

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
    let data = vec![1,2,3];

    stream::iter(data).for_each(|id| async move {
        let request = async { id }; // async io request
        let res = request.await;
        println!("res: {:?}", res);
        ()
    }).await;
    
    Ok(())
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=ad5feaf0cbb3597730c22df2eaf4a606