我如何创建一个流,其中的项目基于流先前返回的项目?
How can I create a stream where the items are based on items that the stream previously returned?
我有一个函数可以根据参数生成 futures::Stream
。我想多次调用此函数并将流拼合在一起。使事情复杂化的是,我想将流返回的值作为原始函数的参数返回。
具体来说,我有一个函数 returns 数字流归零:
fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
我想从 5 开始调用这个函数。还应该为返回的每个奇数值调用该函数。对 numbers_down_to_zero
的总调用集为:
numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);
正在制作
的总流
4
3
2
1
0
2
1
0
0
0
有什么技术可以做到这一点?
这些是我找到的部分解决方案,但由于各种原因缺乏。
使用具有内部可变性的组合器
我不喜欢这个解决方案,因为我认为这个一般问题不需要内部可变性,但这里需要它,因为借用检查器不知道对闭包的调用将如何交错。
use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use std::{cell::RefCell, rc::Rc};
fn y0() -> impl Stream<Item = i32> {
let to_visit = Rc::new(RefCell::new(VecDeque::from(vec![5])));
let to_visit_b = to_visit.clone();
stream::unfold(to_visit, |to_visit| async {
let i = to_visit.borrow_mut().pop_back()?;
Some((x(i), to_visit))
})
.flatten()
.inspect(move |&x| {
if x % 2 != 0 {
to_visit_b.borrow_mut().push_front(x);
}
})
}
#[tokio::main]
async fn main() {
y0().for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
Stream::poll_next
的自定义实现
我不喜欢这个解决方案,因为它很冗长并且需要棘手的 unsafe
难以推理的代码(我什至不确定我的代码是否正确!)
use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use std::{
pin::Pin,
task::{Context, Poll},
};
struct X<St, C, R, S>
where
C: Fn(&mut St) -> Option<S>,
R: Fn(&mut St, &mut S::Item),
S: Stream,
{
state: St,
create: C,
review: R,
current: Option<S>,
}
impl<St, C, R, S> Stream for X<St, C, R, S>
where
C: Fn(&mut St) -> Option<S>,
R: Fn(&mut St, &mut S::Item),
S: Stream,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (state, create, review, current) = unsafe {
let Self {
state,
create,
review,
current,
} = self.get_unchecked_mut();
(state, create, review, current)
};
loop {
if let Some(current) = current {
let v = unsafe { futures::ready!(Pin::new_unchecked(current).poll_next(ctx)) };
if let Some(mut v) = v {
review(state, &mut v);
return Poll::Ready(Some(v));
}
}
*current = create(state);
if current.is_none() {
return Poll::Ready(None);
}
}
}
}
fn y1() -> impl Stream<Item = i32> {
X {
state: VecDeque::from(vec![5]),
create: |to_visit| {
let i = to_visit.pop_back()?;
Some(x(i))
},
review: |to_visit, &mut x| {
if x % 2 != 0 {
to_visit.push_front(x);
}
},
current: None,
}
}
#[tokio::main]
async fn main() {
y1().for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
使用频道(非工作)
这行不通,因为永远不会丢弃发送方,因为永远不会丢弃接收方,因为永远不会丢弃发送方...
除了不起作用之外,这还有很多缺点:
- 状态必须隐含地是一个队列(这与我想做的相匹配,但不是很笼统)。
- 需要我的函数变成
async
本身来推送初始值来访问。
- 我必须处理看似无关紧要的错误情况。
- 我必须在
then
闭包中克隆 Sender
。
use futures::{stream, Stream, StreamExt};
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use futures::channel::mpsc;
use futures::sink::SinkExt;
async fn y2() -> impl Stream<Item = i32> {
let (mut tx, rx) = mpsc::unbounded();
tx.send(5).await.unwrap();
rx.map(x).flatten().then(move |x| {
let mut tx = tx.clone();
async move {
if x % 2 != 0 {
tx.send(x).await.unwrap();
}
x
}
})
}
#[tokio::main]
async fn main() {
y2().await
.for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
你可以用 unfold
解决这个问题。您将有一个 "state" 结构,它同时保留 "base stream" (在本例中为零)和将产生新流的项目列表,并将其用作 [= 的参数11=]展开时保持状态。
这样编译器就不必推断生命周期所有权,因为每次调用闭包时状态都可以移到 async
块中。
/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}
/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
/// Helper struct to keep state while unfolding
struct StreamState<S> {
inner_stream: S,
item_queue: VecDeque<i32>,
}
// Build helper struct
let state = StreamState {
inner_stream: f(n),
item_queue: VecDeque::new(),
};
// Unfold with state
stream::unfold(state, |mut state| async move {
loop {
if let Some(item) = state.inner_stream.next().await {
// Iterate inner stream, and potentially push item to queue
if item % 2 == 1 {
state.item_queue.push_front(item);
}
break Some((item, state));
} else if let Some(item) = state.item_queue.pop_back() {
// If inner stream is exhausted, produce new stream from queue
// and repeat loop
state.inner_stream = f(item);
} else {
// If queue is empty, we are done
break None;
}
}
})
}
StreamExt::next
要求内部流实现 Unpin
,因此它不能用于任意流。您始终可以使用 Box::pin(stream)
代替,因为 Pin<Box<T>>
是 Unpin
并且如果 T: Stream
.
实现 Stream
通过(ab)使用 async / await,genawaiter
crate 设法模仿了今天 stable Rust 中的生成器语法。结合 futures::pin_mut
将值固定在堆栈上,这是一个无需分配且与任意流兼容的解决方案:
//# futures = "0.3"
//# genawaiter = { version = "0.2", features = ["futures03"] }
//# tokio = { version = "0.2", features = ["full"] }
use futures::{
pin_mut,
stream::{self, Stream, StreamExt},
};
use genawaiter::{generator_mut, stack::Co};
use std::collections::VecDeque;
async fn g(n: i32, co: Co<'_, i32>) {
let mut seeds = VecDeque::from(vec![n]);
while let Some(seed) = seeds.pop_back() {
let stream = f(seed);
pin_mut!(stream);
while let Some(x) = stream.next().await {
if x % 2 != 0 {
seeds.push_front(x);
}
co.yield_(x).await;
}
}
}
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}
#[tokio::main]
async fn main() {
generator_mut!(stream, |co| g(5, co));
stream
.for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
一些缺点:
generator_mut
宏中有一个不安全的调用
- 接口有点漏。调用者可以看到一些实现细节。
通过一次堆分配,genawaiter::rc::Gen
可以摆脱所有这些。但同样,在 table 上分配还有其他选择。
use futures::{
pin_mut,
stream::{Stream, StreamExt},
};
use genawaiter::rc::Gen;
use std::collections::VecDeque;
fn g(n: i32) -> impl Stream<Item = i32> {
Gen::new(|co| async move {
let mut seeds = VecDeque::from(vec![n]);
while let Some(seed) = seeds.pop_back() {
let stream = f(seed);
pin_mut!(stream);
while let Some(x) = stream.next().await {
if x % 2 != 0 {
seeds.push_front(x);
}
co.yield_(x).await;
}
}
})
}
我有一个函数可以根据参数生成 futures::Stream
。我想多次调用此函数并将流拼合在一起。使事情复杂化的是,我想将流返回的值作为原始函数的参数返回。
具体来说,我有一个函数 returns 数字流归零:
fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
我想从 5 开始调用这个函数。还应该为返回的每个奇数值调用该函数。对 numbers_down_to_zero
的总调用集为:
numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);
正在制作
的总流4
3
2
1
0
2
1
0
0
0
有什么技术可以做到这一点?
这些是我找到的部分解决方案,但由于各种原因缺乏。
使用具有内部可变性的组合器
我不喜欢这个解决方案,因为我认为这个一般问题不需要内部可变性,但这里需要它,因为借用检查器不知道对闭包的调用将如何交错。
use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use std::{cell::RefCell, rc::Rc};
fn y0() -> impl Stream<Item = i32> {
let to_visit = Rc::new(RefCell::new(VecDeque::from(vec![5])));
let to_visit_b = to_visit.clone();
stream::unfold(to_visit, |to_visit| async {
let i = to_visit.borrow_mut().pop_back()?;
Some((x(i), to_visit))
})
.flatten()
.inspect(move |&x| {
if x % 2 != 0 {
to_visit_b.borrow_mut().push_front(x);
}
})
}
#[tokio::main]
async fn main() {
y0().for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
Stream::poll_next
的自定义实现
我不喜欢这个解决方案,因为它很冗长并且需要棘手的 unsafe
难以推理的代码(我什至不确定我的代码是否正确!)
use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use std::{
pin::Pin,
task::{Context, Poll},
};
struct X<St, C, R, S>
where
C: Fn(&mut St) -> Option<S>,
R: Fn(&mut St, &mut S::Item),
S: Stream,
{
state: St,
create: C,
review: R,
current: Option<S>,
}
impl<St, C, R, S> Stream for X<St, C, R, S>
where
C: Fn(&mut St) -> Option<S>,
R: Fn(&mut St, &mut S::Item),
S: Stream,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (state, create, review, current) = unsafe {
let Self {
state,
create,
review,
current,
} = self.get_unchecked_mut();
(state, create, review, current)
};
loop {
if let Some(current) = current {
let v = unsafe { futures::ready!(Pin::new_unchecked(current).poll_next(ctx)) };
if let Some(mut v) = v {
review(state, &mut v);
return Poll::Ready(Some(v));
}
}
*current = create(state);
if current.is_none() {
return Poll::Ready(None);
}
}
}
}
fn y1() -> impl Stream<Item = i32> {
X {
state: VecDeque::from(vec![5]),
create: |to_visit| {
let i = to_visit.pop_back()?;
Some(x(i))
},
review: |to_visit, &mut x| {
if x % 2 != 0 {
to_visit.push_front(x);
}
},
current: None,
}
}
#[tokio::main]
async fn main() {
y1().for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
使用频道(非工作)
这行不通,因为永远不会丢弃发送方,因为永远不会丢弃接收方,因为永远不会丢弃发送方...
除了不起作用之外,这还有很多缺点:
- 状态必须隐含地是一个队列(这与我想做的相匹配,但不是很笼统)。
- 需要我的函数变成
async
本身来推送初始值来访问。 - 我必须处理看似无关紧要的错误情况。
- 我必须在
then
闭包中克隆Sender
。
use futures::{stream, Stream, StreamExt};
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use futures::channel::mpsc;
use futures::sink::SinkExt;
async fn y2() -> impl Stream<Item = i32> {
let (mut tx, rx) = mpsc::unbounded();
tx.send(5).await.unwrap();
rx.map(x).flatten().then(move |x| {
let mut tx = tx.clone();
async move {
if x % 2 != 0 {
tx.send(x).await.unwrap();
}
x
}
})
}
#[tokio::main]
async fn main() {
y2().await
.for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
你可以用 unfold
解决这个问题。您将有一个 "state" 结构,它同时保留 "base stream" (在本例中为零)和将产生新流的项目列表,并将其用作 [= 的参数11=]展开时保持状态。
这样编译器就不必推断生命周期所有权,因为每次调用闭包时状态都可以移到 async
块中。
/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}
/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
/// Helper struct to keep state while unfolding
struct StreamState<S> {
inner_stream: S,
item_queue: VecDeque<i32>,
}
// Build helper struct
let state = StreamState {
inner_stream: f(n),
item_queue: VecDeque::new(),
};
// Unfold with state
stream::unfold(state, |mut state| async move {
loop {
if let Some(item) = state.inner_stream.next().await {
// Iterate inner stream, and potentially push item to queue
if item % 2 == 1 {
state.item_queue.push_front(item);
}
break Some((item, state));
} else if let Some(item) = state.item_queue.pop_back() {
// If inner stream is exhausted, produce new stream from queue
// and repeat loop
state.inner_stream = f(item);
} else {
// If queue is empty, we are done
break None;
}
}
})
}
StreamExt::next
要求内部流实现 Unpin
,因此它不能用于任意流。您始终可以使用 Box::pin(stream)
代替,因为 Pin<Box<T>>
是 Unpin
并且如果 T: Stream
.
Stream
通过(ab)使用 async / await,genawaiter
crate 设法模仿了今天 stable Rust 中的生成器语法。结合 futures::pin_mut
将值固定在堆栈上,这是一个无需分配且与任意流兼容的解决方案:
//# futures = "0.3"
//# genawaiter = { version = "0.2", features = ["futures03"] }
//# tokio = { version = "0.2", features = ["full"] }
use futures::{
pin_mut,
stream::{self, Stream, StreamExt},
};
use genawaiter::{generator_mut, stack::Co};
use std::collections::VecDeque;
async fn g(n: i32, co: Co<'_, i32>) {
let mut seeds = VecDeque::from(vec![n]);
while let Some(seed) = seeds.pop_back() {
let stream = f(seed);
pin_mut!(stream);
while let Some(x) = stream.next().await {
if x % 2 != 0 {
seeds.push_front(x);
}
co.yield_(x).await;
}
}
}
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}
#[tokio::main]
async fn main() {
generator_mut!(stream, |co| g(5, co));
stream
.for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
一些缺点:
generator_mut
宏中有一个不安全的调用- 接口有点漏。调用者可以看到一些实现细节。
通过一次堆分配,genawaiter::rc::Gen
可以摆脱所有这些。但同样,在 table 上分配还有其他选择。
use futures::{
pin_mut,
stream::{Stream, StreamExt},
};
use genawaiter::rc::Gen;
use std::collections::VecDeque;
fn g(n: i32) -> impl Stream<Item = i32> {
Gen::new(|co| async move {
let mut seeds = VecDeque::from(vec![n]);
while let Some(seed) = seeds.pop_back() {
let stream = f(seed);
pin_mut!(stream);
while let Some(x) = stream.next().await {
if x % 2 != 0 {
seeds.push_front(x);
}
co.yield_(x).await;
}
}
})
}