如何将迭代器在成功时转换为流或在失败时转换为空流?
How do I convert an iterator into a stream on success or an empty stream on failure?
我想要 regular iterator and turn it into a stream 以便我可以进行进一步的流处理。问题是我可能有一个迭代器或一个错误要处理。我想我已经很接近这个了:
#[macro_use]
extern crate log;
extern crate futures; // 0.1.21
extern crate tokio;
use futures::prelude::*;
use futures::{future, stream};
use std::fmt::Debug;
use std::net::{SocketAddr, ToSocketAddrs};
fn resolve(addrs: impl ToSocketAddrs + Debug) -> impl Stream<Item = SocketAddr, Error = ()> {
match addrs.to_socket_addrs() {
Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
Some(a) => Some(future::ok((a, iter))),
None => None,
}),
Err(e) => {
error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
stream::empty()
}
}
}
fn main() {
let task = resolve("1.2.3.4:12345")
.map_err(|e| error!("{:?}", e))
.for_each(|addr| info!("{:?}", addr))
.fold();
tokio::run(task);
}
error[E0308]: match arms have incompatible types
--> src/main.rs:12:5
|
12 | / match addrs.to_socket_addrs() {
13 | | Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
14 | | Some(a) => Some(future::ok((a, iter))),
15 | | None => None,
... |
20 | | }
21 | | }
| |_____^ expected struct `futures::stream::Unfold`, found struct `futures::stream::Empty`
|
= note: expected type `futures::stream::Unfold<<impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter, [closure@src/main.rs:13:42: 16:10], futures::FutureResult<(std::net::SocketAddr, <impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter), _>>`
found type `futures::stream::Empty<_, _>`
note: match arm with an incompatible type
--> src/main.rs:17:19
|
17 | Err(e) => {
| ___________________^
18 | | error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
19 | | stream::empty()
20 | | }
| |_________^
error[E0277]: the trait bound `(): futures::Future` is not satisfied
--> src/main.rs:27:10
|
27 | .for_each(|addr| info!("{:?}", addr))
| ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
|
= note: required because of the requirements on the impl of `futures::IntoFuture` for `()`
error[E0599]: no method named `fold` found for type `futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()>` in the current scope
--> src/main.rs:28:10
|
28 | .fold();
| ^^^^
|
= note: the method `fold` exists but the following trait bounds were not satisfied:
`&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : futures::Stream`
`&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : std::iter::Iterator`
提示很明显。我 return 来自 match
的两个 Result
不同,应该是相同的。现在,我该怎么做才能 return 流?
Rust 是一种静态类型语言,这意味着函数的 return 类型必须是编译时已知的单一类型。您正在尝试 return 多种类型,这在运行时决定。
最接近您原来的解决方案是始终return Unfold
流:
fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
stream::unfold(addrs.to_socket_addrs(), |r| {
match r {
Ok(mut iter) => iter.next().map(|addr| future::ok((addr, Ok(iter)))),
Err(_) => None,
}
})
}
但为什么要重新发明轮子?
Converts an Iterator
into a Stream
which is always ready to yield the next value.
futures crate 的后续版本为 Either
实现了 Stream
,这使得它非常优雅:
fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
match addrs.to_socket_addrs() {
Ok(iter) => stream::iter_ok(iter).left_stream(),
Err(_) => stream::empty().right_stream(),
}
}
将此功能反向移植到 futures 0.1 很简单(也许有人应该将其作为 PR 提交给那些卡在 0.1 上的人...):
enum MyEither<L, R> {
Left(L),
Right(R),
}
impl<L, R> Stream for MyEither<L, R>
where
L: Stream,
R: Stream<Item = L::Item, Error = L::Error>,
{
type Item = L::Item;
type Error = L::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
MyEither::Left(l) => l.poll(),
MyEither::Right(r) => r.poll(),
}
}
}
trait EitherStreamExt {
fn left_stream<R>(self) -> MyEither<Self, R>
where
Self: Sized;
fn right_stream<L>(self) -> MyEither<L, Self>
where
Self: Sized;
}
impl<S: Stream> EitherStreamExt for S {
fn left_stream<R>(self) -> MyEither<Self, R> {
MyEither::Left(self)
}
fn right_stream<L>(self) -> MyEither<L, Self> {
MyEither::Right(self)
}
}
更好的是,利用 Result
是一个迭代器并且 Stream::flatten
存在的事实:
fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
stream::iter_ok(addrs.to_socket_addrs())
.map(stream::iter_ok)
.flatten()
}
或者如果你真的想打印错误:
fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
stream::once(addrs.to_socket_addrs())
.map(stream::iter_ok)
.map_err(|e| eprintln!("err: {}", e))
.flatten()
}
另请参阅:
- What is the correct way to return an Iterator (or any other trait)?
我想要 regular iterator and turn it into a stream 以便我可以进行进一步的流处理。问题是我可能有一个迭代器或一个错误要处理。我想我已经很接近这个了:
#[macro_use]
extern crate log;
extern crate futures; // 0.1.21
extern crate tokio;
use futures::prelude::*;
use futures::{future, stream};
use std::fmt::Debug;
use std::net::{SocketAddr, ToSocketAddrs};
fn resolve(addrs: impl ToSocketAddrs + Debug) -> impl Stream<Item = SocketAddr, Error = ()> {
match addrs.to_socket_addrs() {
Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
Some(a) => Some(future::ok((a, iter))),
None => None,
}),
Err(e) => {
error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
stream::empty()
}
}
}
fn main() {
let task = resolve("1.2.3.4:12345")
.map_err(|e| error!("{:?}", e))
.for_each(|addr| info!("{:?}", addr))
.fold();
tokio::run(task);
}
error[E0308]: match arms have incompatible types
--> src/main.rs:12:5
|
12 | / match addrs.to_socket_addrs() {
13 | | Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
14 | | Some(a) => Some(future::ok((a, iter))),
15 | | None => None,
... |
20 | | }
21 | | }
| |_____^ expected struct `futures::stream::Unfold`, found struct `futures::stream::Empty`
|
= note: expected type `futures::stream::Unfold<<impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter, [closure@src/main.rs:13:42: 16:10], futures::FutureResult<(std::net::SocketAddr, <impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter), _>>`
found type `futures::stream::Empty<_, _>`
note: match arm with an incompatible type
--> src/main.rs:17:19
|
17 | Err(e) => {
| ___________________^
18 | | error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
19 | | stream::empty()
20 | | }
| |_________^
error[E0277]: the trait bound `(): futures::Future` is not satisfied
--> src/main.rs:27:10
|
27 | .for_each(|addr| info!("{:?}", addr))
| ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
|
= note: required because of the requirements on the impl of `futures::IntoFuture` for `()`
error[E0599]: no method named `fold` found for type `futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()>` in the current scope
--> src/main.rs:28:10
|
28 | .fold();
| ^^^^
|
= note: the method `fold` exists but the following trait bounds were not satisfied:
`&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : futures::Stream`
`&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : std::iter::Iterator`
提示很明显。我 return 来自 match
的两个 Result
不同,应该是相同的。现在,我该怎么做才能 return 流?
Rust 是一种静态类型语言,这意味着函数的 return 类型必须是编译时已知的单一类型。您正在尝试 return 多种类型,这在运行时决定。
最接近您原来的解决方案是始终return Unfold
流:
fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
stream::unfold(addrs.to_socket_addrs(), |r| {
match r {
Ok(mut iter) => iter.next().map(|addr| future::ok((addr, Ok(iter)))),
Err(_) => None,
}
})
}
但为什么要重新发明轮子?
Converts an
Iterator
into aStream
which is always ready to yield the next value.
futures crate 的后续版本为 Either
实现了 Stream
,这使得它非常优雅:
fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
match addrs.to_socket_addrs() {
Ok(iter) => stream::iter_ok(iter).left_stream(),
Err(_) => stream::empty().right_stream(),
}
}
将此功能反向移植到 futures 0.1 很简单(也许有人应该将其作为 PR 提交给那些卡在 0.1 上的人...):
enum MyEither<L, R> {
Left(L),
Right(R),
}
impl<L, R> Stream for MyEither<L, R>
where
L: Stream,
R: Stream<Item = L::Item, Error = L::Error>,
{
type Item = L::Item;
type Error = L::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
MyEither::Left(l) => l.poll(),
MyEither::Right(r) => r.poll(),
}
}
}
trait EitherStreamExt {
fn left_stream<R>(self) -> MyEither<Self, R>
where
Self: Sized;
fn right_stream<L>(self) -> MyEither<L, Self>
where
Self: Sized;
}
impl<S: Stream> EitherStreamExt for S {
fn left_stream<R>(self) -> MyEither<Self, R> {
MyEither::Left(self)
}
fn right_stream<L>(self) -> MyEither<L, Self> {
MyEither::Right(self)
}
}
更好的是,利用 Result
是一个迭代器并且 Stream::flatten
存在的事实:
fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
stream::iter_ok(addrs.to_socket_addrs())
.map(stream::iter_ok)
.flatten()
}
或者如果你真的想打印错误:
fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
stream::once(addrs.to_socket_addrs())
.map(stream::iter_ok)
.map_err(|e| eprintln!("err: {}", e))
.flatten()
}
另请参阅:
- What is the correct way to return an Iterator (or any other trait)?