如何使用 take_while 和 futures::Stream?

How to use take_while with futures::Stream?

我想了解 take_while() with futures::Stream; crate (0.1.25). Here's a piece of code (on playground):

我应该使用什么语法
use futures::{stream, Stream}; // 0.1.25

fn into_many(i: i32) -> impl Stream<Item = i32, Error = ()> {
    stream::iter_ok(0..i)
}

fn main() {
    println!("start:");
    let _ = into_many(10)
        // .take_while(|x| { x < 10 })
        .map(|x| {
            println!("number={}", x);
            x
        })
        .wait();
    for _ in foo {} // ← this (by @mcarton)

    println!("finish:");
}

主要目标是使用 take_while 成功确定 operators/commands 到 运行 呈现的游乐场的正确组合:当我取消注释我的 take_while() 时,它说

expected &i32, found integral variable | help: consider borrowing here: &10

如果我引用,它会说:

error[E0277]: the trait bound bool: futures::future::Future is not satisfied

这对我来说很奇怪。

wait returns 流的迭代器版本,但该迭代器仍然是惰性的,这意味着您需要迭代它才能真正执行闭包:

use futures::{stream, Stream}; // 0.1.25

fn into_many(i: i32) -> impl Stream<Item = i32, Error = ()> {
    stream::iter_ok(0..i)
}

fn main() {
    println!("start:");
    let foo = into_many(10)
        // .take_while(|x| { x < 10 })
        .map(|x| {
            println!("number={}", x);
            x
        })
        .wait();

    for _ in foo {} // ← this

    println!("finish:");
}

(link to playground)

take_while 期望关闭到 return 一个未来,或者可以转换为未来的东西。 bool 没有实现 IntoFuture,所以你必须在未来包装它。 future::ok return 一个立即准备好指定值的未来。

use futures::{future, stream, Stream}; // 0.1.25

fn into_many(i: i32) -> impl Stream<Item = i32, Error = ()> {
    stream::iter_ok(0..i)
}

fn main() {
    println!("start:");
    let foo = into_many(10)
        .take_while(|&x| { future::ok(x < 10) })
        .map(|x| {
            println!("number={}", x);
            x
        })
        .wait();

    for _ in foo {}

    println!("finish:");
}