我如何创建一个流,其中的项目基于流先前返回的项目?

How can I create a stream where the items are based on items that the stream previously returned?

我有一个函数可以根据参数生成 futures::Stream。我想多次调用此函数并将流拼合在一起。使事情复杂化的是,我想将流返回的值作为原始函数的参数返回。

具体来说,我有一个函数 returns 数字流归零:

fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

我想从 5 开始调用这个函数。还应该为返回的每个奇数值调用该函数。对 numbers_down_to_zero 的总调用集为:

numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);

正在制作

的总流
4
3
2
1
0
2
1
0
0
0

有什么技术可以做到这一点?

这些是我找到的部分解决方案,但由于各种原因缺乏。

使用具有内部可变性的组合器

我不喜欢这个解决方案,因为我认为这个一般问题不需要内部可变性,但这里需要它,因为借用检查器不知道对闭包的调用将如何交错。

use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use std::{cell::RefCell, rc::Rc};

fn y0() -> impl Stream<Item = i32> {
    let to_visit = Rc::new(RefCell::new(VecDeque::from(vec![5])));
    let to_visit_b = to_visit.clone();

    stream::unfold(to_visit, |to_visit| async {
        let i = to_visit.borrow_mut().pop_back()?;

        Some((x(i), to_visit))
    })
    .flatten()
    .inspect(move |&x| {
        if x % 2 != 0 {
            to_visit_b.borrow_mut().push_front(x);
        }
    })
}

#[tokio::main]
async fn main() {
    y0().for_each(|v| async move {
        println!("v: {}", v);
    })
    .await;
}

playground

Stream::poll_next

的自定义实现

我不喜欢这个解决方案,因为它很冗长并且需要棘手的 unsafe 难以推理的代码(我什至不确定我的代码是否正确!)

use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use std::{
    pin::Pin,
    task::{Context, Poll},
};

struct X<St, C, R, S>
where
    C: Fn(&mut St) -> Option<S>,
    R: Fn(&mut St, &mut S::Item),
    S: Stream,
{
    state: St,
    create: C,
    review: R,
    current: Option<S>,
}

impl<St, C, R, S> Stream for X<St, C, R, S>
where
    C: Fn(&mut St) -> Option<S>,
    R: Fn(&mut St, &mut S::Item),
    S: Stream,
{
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let (state, create, review, current) = unsafe {
            let Self {
                state,
                create,
                review,
                current,
            } = self.get_unchecked_mut();
            (state, create, review, current)
        };

        loop {
            if let Some(current) = current {
                let v = unsafe { futures::ready!(Pin::new_unchecked(current).poll_next(ctx)) };
                if let Some(mut v) = v {
                    review(state, &mut v);
                    return Poll::Ready(Some(v));
                }
            }

            *current = create(state);
            if current.is_none() {
                return Poll::Ready(None);
            }
        }
    }
}

fn y1() -> impl Stream<Item = i32> {
    X {
        state: VecDeque::from(vec![5]),
        create: |to_visit| {
            let i = to_visit.pop_back()?;

            Some(x(i))
        },
        review: |to_visit, &mut x| {
            if x % 2 != 0 {
                to_visit.push_front(x);
            }
        },
        current: None,
    }
}

#[tokio::main]
async fn main() {
    y1().for_each(|v| async move {
        println!("v: {}", v);
    })
    .await;
}

playground


使用频道(非工作)

这行不通,因为永远不会丢弃发送方,因为永远不会丢弃接收方,因为永远不会丢弃发送方...

除了不起作用之外,这还有很多缺点:

  • 状态必须隐含地是一个队列(这与我想做的相匹配,但不是很笼统)。
  • 需要我的函数变成async本身来推送初始值来访问。
  • 我必须处理看似无关紧要的错误情况。
  • 我必须在 then 闭包中克隆 Sender
use futures::{stream, Stream, StreamExt};

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use futures::channel::mpsc;
use futures::sink::SinkExt;

async fn y2() -> impl Stream<Item = i32> {
    let (mut tx, rx) = mpsc::unbounded();

    tx.send(5).await.unwrap();

    rx.map(x).flatten().then(move |x| {
        let mut tx = tx.clone();
        async move {
            if x % 2 != 0 {
                tx.send(x).await.unwrap();
            }
            x
        }
    })
}

#[tokio::main]
async fn main() {
    y2().await
        .for_each(|v| async move {
            println!("v: {}", v);
        })
        .await;
}

playground

你可以用 unfold 解决这个问题。您将有一个 "state" 结构,它同时保留 "base stream" (在本例中为零)和将产生新流的项目列表,并将其用作 [= 的参数11=]展开时保持状态。

这样编译器就不必推断生命周期所有权,因为每次调用闭包时状态都可以移到 async 块中。

/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}

/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
    /// Helper struct to keep state while unfolding
    struct StreamState<S> {
        inner_stream: S,
        item_queue: VecDeque<i32>,
    }

    // Build helper struct
    let state = StreamState {
        inner_stream: f(n),
        item_queue: VecDeque::new(),
    };

    // Unfold with state
    stream::unfold(state, |mut state| async move {
        loop {
            if let Some(item) = state.inner_stream.next().await {
                // Iterate inner stream, and potentially push item to queue
                if item % 2 == 1 {
                    state.item_queue.push_front(item);
                }
                break Some((item, state));
            } else if let Some(item) = state.item_queue.pop_back() {
                // If inner stream is exhausted, produce new stream from queue
                // and repeat loop
                state.inner_stream = f(item);
            } else {
                // If queue is empty, we are done
                break None;
            }
        }
    })
}

Full playground example

StreamExt::next 要求内部流实现 Unpin,因此它不能用于任意流。您始终可以使用 Box::pin(stream) 代替,因为 Pin<Box<T>>Unpin 并且如果 T: Stream.

实现 Stream

通过(ab)使用 async / await,genawaiter crate 设法模仿了今天 stable Rust 中的生成器语法。结合 futures::pin_mut 将值固定在堆栈上,这是一个无需分配且与任意流兼容的解决方案:

//# futures = "0.3"
//# genawaiter = { version = "0.2", features = ["futures03"] }
//# tokio = { version = "0.2", features = ["full"] }
use futures::{
    pin_mut,
    stream::{self, Stream, StreamExt},
};
use genawaiter::{generator_mut, stack::Co};
use std::collections::VecDeque;

async fn g(n: i32, co: Co<'_, i32>) {
    let mut seeds = VecDeque::from(vec![n]);
    while let Some(seed) = seeds.pop_back() {
        let stream = f(seed);
        pin_mut!(stream);
        while let Some(x) = stream.next().await {
            if x % 2 != 0 {
                seeds.push_front(x);
            }
            co.yield_(x).await;
        }
    }
}

fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}

#[tokio::main]
async fn main() {
    generator_mut!(stream, |co| g(5, co));
    stream
        .for_each(|v| async move {
            println!("v: {}", v);
        })
        .await;
}

一些缺点:

  • generator_mut 宏中有一个不安全的调用
  • 接口有点漏。调用者可以看到一些实现细节。

通过一次堆分配,genawaiter::rc::Gen 可以摆脱所有这些。但同样,在 table 上分配还有其他选择。

use futures::{
    pin_mut,
    stream::{Stream, StreamExt},
};
use genawaiter::rc::Gen;
use std::collections::VecDeque;

fn g(n: i32) -> impl Stream<Item = i32> {
    Gen::new(|co| async move {
        let mut seeds = VecDeque::from(vec![n]);
        while let Some(seed) = seeds.pop_back() {
            let stream = f(seed);
            pin_mut!(stream);
            while let Some(x) = stream.next().await {
                if x % 2 != 0 {
                    seeds.push_front(x);
                }
                co.yield_(x).await;
            }
        }
    })
}