如何将迭代器在成功时转换为流或在失败时转换为空流?

How do I convert an iterator into a stream on success or an empty stream on failure?

我想要 regular iterator and turn it into a stream 以便我可以进行进一步的流处理。问题是我可能有一个迭代器或一个错误要处理。我想我已经很接近这个了:

#[macro_use]
extern crate log;
extern crate futures; // 0.1.21
extern crate tokio;

use futures::prelude::*;
use futures::{future, stream};
use std::fmt::Debug;
use std::net::{SocketAddr, ToSocketAddrs};

fn resolve(addrs: impl ToSocketAddrs + Debug) -> impl Stream<Item = SocketAddr, Error = ()> {
    match addrs.to_socket_addrs() {
        Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
            Some(a) => Some(future::ok((a, iter))),
            None => None,
        }),
        Err(e) => {
            error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
            stream::empty()
        }
    }
}

fn main() {
    let task = resolve("1.2.3.4:12345")
        .map_err(|e| error!("{:?}", e))
        .for_each(|addr| info!("{:?}", addr))
        .fold();
    tokio::run(task);
}

playground

error[E0308]: match arms have incompatible types
  --> src/main.rs:12:5
   |
12 | /     match addrs.to_socket_addrs() {
13 | |         Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
14 | |             Some(a) => Some(future::ok((a, iter))),
15 | |             None => None,
...  |
20 | |         }
21 | |     }
   | |_____^ expected struct `futures::stream::Unfold`, found struct `futures::stream::Empty`
   |
   = note: expected type `futures::stream::Unfold<<impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter, [closure@src/main.rs:13:42: 16:10], futures::FutureResult<(std::net::SocketAddr, <impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter), _>>`
              found type `futures::stream::Empty<_, _>`
note: match arm with an incompatible type
  --> src/main.rs:17:19
   |
17 |           Err(e) => {
   |  ___________________^
18 | |             error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
19 | |             stream::empty()
20 | |         }
   | |_________^

error[E0277]: the trait bound `(): futures::Future` is not satisfied
  --> src/main.rs:27:10
   |
27 |         .for_each(|addr| info!("{:?}", addr))
   |          ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
   |
   = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`

error[E0599]: no method named `fold` found for type `futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()>` in the current scope
  --> src/main.rs:28:10
   |
28 |         .fold();
   |          ^^^^
   |
   = note: the method `fold` exists but the following trait bounds were not satisfied:
           `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : futures::Stream`
           `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : std::iter::Iterator`

提示很明显。我 return 来自 match 的两个 Result 不同,应该是相同的。现在,我该怎么做才能 return 流?

Rust 是一种静态类型语言,这意味着函数的 return 类型必须是编译时已知的单一类型。您正在尝试 return 多种类型,这在运行时决定。

最接近您原来的解决方案是始终return Unfold 流:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::unfold(addrs.to_socket_addrs(), |r| {
        match r {
            Ok(mut iter) => iter.next().map(|addr| future::ok((addr, Ok(iter)))),
            Err(_) => None,
        }
    })
}

但为什么要重新发明轮子?

futures::stream::iter_ok

Converts an Iterator into a Stream which is always ready to yield the next value.

futures crate 的后续版本为 Either 实现了 Stream,这使得它非常优雅:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    match addrs.to_socket_addrs() {
        Ok(iter) => stream::iter_ok(iter).left_stream(),
        Err(_) => stream::empty().right_stream(),
    }
}

将此功能反向移植到 futures 0.1 很简单(也许有人应该将其作为 PR 提交给那些卡在 0.1 上的人...):

enum MyEither<L, R> {
    Left(L),
    Right(R),
}

impl<L, R> Stream for MyEither<L, R>
where
    L: Stream,
    R: Stream<Item = L::Item, Error = L::Error>,
{
    type Item = L::Item;
    type Error = L::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self {
            MyEither::Left(l) => l.poll(),
            MyEither::Right(r) => r.poll(),
        }
    }
}

trait EitherStreamExt {
    fn left_stream<R>(self) -> MyEither<Self, R>
    where
        Self: Sized;
    fn right_stream<L>(self) -> MyEither<L, Self>
    where
        Self: Sized;
}

impl<S: Stream> EitherStreamExt for S {
    fn left_stream<R>(self) -> MyEither<Self, R> {
        MyEither::Left(self)
    }
    fn right_stream<L>(self) -> MyEither<L, Self> {
        MyEither::Right(self)
    }
}

更好的是,利用 Result 是一个迭代器并且 Stream::flatten 存在的事实:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::iter_ok(addrs.to_socket_addrs())
        .map(stream::iter_ok)
        .flatten()
}

或者如果你真的想打印错误:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::once(addrs.to_socket_addrs())
        .map(stream::iter_ok)
        .map_err(|e| eprintln!("err: {}", e))
        .flatten()
}

另请参阅:

  • What is the correct way to return an Iterator (or any other trait)?