类型不匹配解决从期货通道转发消息到 WebSocket Sink 时的错误类型
Type mismatch resolving the error type when forwarding messages from a futures channel to a WebSocket Sink
我正在努力研究 Rust 中的期货,但我对这段代码感到困惑,该代码应该将到达 rx
的消息发送到 sink
:
extern crate futures;
extern crate tokio_core;
extern crate websocket;
use websocket::message::OwnedMessage;
use websocket::server::InvalidConnection;
use websocket::async::Server;
use tokio_core::reactor::Core;
use futures::{Future, Sink, Stream};
use futures::sync::mpsc;
use std::{thread, time};
use futures::sync::mpsc::Receiver;
fn main() {
let mut core = Core::new().unwrap();
let (mut tx, rx) = mpsc::channel(5);
thread::spawn(|| worker(rx));
let mut i = 0;
loop {
let res = tx.clone().send(OwnedMessage::Text(format!("Test {}", i)));
core.run(res);
i += 1;
let period = time::Duration::from_millis(200);
thread::sleep(period);
}
}
fn worker(rx: Receiver<OwnedMessage>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
// bind to the server
let server = Server::bind("127.0.0.1:9000", &handle).unwrap();
let f = server.incoming()
// we don't wanna save the stream if it drops
.map_err(|InvalidConnection { error, .. }| error)
.for_each(|(upgrade, addr)| {
// accept the request to be a ws connection if it does
let f = upgrade
.use_protocol("rust-websocket")
.accept()
.and_then(|(s, _)| {
let (sink, stream) = s.split();
rx // using stream (echoing back) works
.forward(sink)
.map_err(|error| {
error
})
.and_then(|(a, sink)| {
sink.send(OwnedMessage::Close(None))
})
});
handle.spawn(f.map_err(move |e| println!("Err"))
.map(move |_| println!("Done")));
Ok(())
});
core.run(f).expect("somerror");
}
如评论中所述,使用 stream
作为输入效果很好。使用 rx
时,编译器会抱怨有关错误类型的类型不匹配(我相信):
error[E0271]: type mismatch resolving `<futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>> as futures::Sink>::SinkError == ()`
--> src/main.rs:47:26
|
47 | .forward(sink)
| ^^^^^^^ expected enum `websocket::WebSocketError`, found ()
|
= note: expected type `websocket::WebSocketError`
found type `()`
error[E0599]: no method named `map_err` found for type `futures::stream::Forward<futures::sync::mpsc::Receiver<websocket::OwnedMessage>, futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>>` in the current scope
--> src/main.rs:48:26
|
48 | .map_err(|error| {
| ^^^^^^^
|
= note: the method `map_err` exists but the following trait bounds were not satisfied:
`futures::stream::Forward<futures::sync::mpsc::Receiver<websocket::OwnedMessage>, futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>> : futures::Future`
这些是我的依赖项:
[dependencies]
websocket = "0.20.0"
futures = "0.1"
tokio-core = "0.1"
我在这里错过了什么?
error[E0271]: type mismatch resolving
`<futures::stream::SplitSink<
websocket::client::async::Framed<
tokio_core::net::TcpStream,
websocket::async::MessageCodec<websocket::OwnedMessage>>>
as futures::Sink>::SinkError == ()`
我们这里有两种类型:<futures::stream::SplitSink<...> as futures::Sink>::SinkError
和()
。这两种类型从何而来?此外,第一个是未解析的关联类型;也许我们可以解决它以获得更多见解?一步步来追溯吧。
首先,我们需要弄清楚为什么编译器首先要尝试匹配这两种类型。如果我们查看 forward
的签名,我们将看到约束 Self::Error: From<S::SinkError>
。 Self
是我们调用 forward
的流的类型,而 S
是作为参数传递给 forward
.[=91= 的接收器的类型]
我们正在 rx
上调用 forward
,其类型为 futures::sync::mpsc::Receiver
。在 documentation page for Receiver
上,我们可以看到以下内容:
impl<T> Stream for Receiver<T>
type Item = T
type Error = ()
这向我们展示了 ()
的来源。现在让我们看看 sink
参数。
sink
的类型是 futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>
(我们从错误消息中知道这一点;RLS also confirms this). On the documentation page for SplitSink
,我们有:
impl<S: Sink> Sink for SplitSink<S>
type SinkItem = S::SinkItem
type SinkError = S::SinkError
因此 SplitSink
的 SinkError
与其内部水槽的 SinkError
相同。内部水槽的类型是 websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>
。 documentation for Framed
说什么?
impl<T, U> Sink for Framed<T, U>
where
T: AsyncWrite,
U: Encoder,
<U as Encoder>::Error: From<Error>,
type SinkItem = <U as Encoder>::Item
type SinkError = <U as Encoder>::Error
Framed
有两个类型参数,但是我们只需要看第二个,也就是这里的websocket::async::MessageCodec<websocket::OwnedMessage>
,就可以确定SinkError
类型。我们来看看 MessageCodec
now. (Note: websocket::codec::ws::MessageCodec
is reexported 为 websocket::async::MessageCodec
.)
impl<M> Decoder for MessageCodec<M>
where
M: MessageTrait,
type Item = OwnedMessage
type Error = WebSocketError
啊哈!接收器产生 WebSocketError
.
类型的错误
现在我们已经弄清楚了类型,让我们回到我们最初关心类型的原因。我们试图理解为什么在调用 forward
时未满足约束条件 Self::Error: From<S::SinkError>
。我们现在知道编译器正在尝试解析 (): From<WebSocketError>
。好像没有impl From<WebSocketError> for ()
。让我们验证一下:
extern crate websocket;
fn main() {
let a = websocket::result::WebSocketError::NoDataAvailable;
let () = From::from(a);
}
的确,编译失败:
error[E0277]: the trait bound `(): std::convert::From<websocket::WebSocketError>` is not satisfied
--> src/main.rs:5:14
|
5 | let () = From::from(a);
| ^^^^^^^^^^ the trait `std::convert::From<websocket::WebSocketError>` is not implemented for `()`
|
= note: required by `std::convert::From::from`
我们可以通过使用 sink_map_err
更改 sink
的错误类型来解决缺少的实现。
let (sink, stream) = s.split();
let sink = sink.sink_map_err(|_| ()); // <<<<<
rx
.forward(sink)
.and_then(|(a, sink)| {
sink.send(OwnedMessage::Close(None))
})
这解决了对 forward
的调用,但现在此闭包的结果不与 upgrade.use_protocol("rust-websocket").accept()
组合,它的错误类型仍然是 WebSocketError
。改为更改 rx
的错误类型更有意义。但是我们如何从不携带任何信息的 ()
构造一个 WebSocketError
呢?
您可能想知道,为什么 Receiver
使用 ()
作为其错误类型?如果我们查看 source code, we can see that in fact, poll
never returns an error. I think it would be more appropriate if the error type was !
(the never type) or some other void type, to clearly indicate that errors are impossible; there's an issue open on futures 要求对期货 0.2 进行此更改。
因为不可能出错,所以我们不需要构造一个WebSocketError
;我们可以改为分散,例如恐慌。
fn worker(rx: Receiver<OwnedMessage>) {
let rx = rx.map_err(|()| panic!("Receiver should never fail!"));
let mut core = Core::new().unwrap();
let handle = core.handle();
// bind to the server
let server = Server::bind("127.0.0.1:9000", &handle).unwrap();
let f = server.incoming()
// we don't wanna save the stream if it drops
.map_err(|InvalidConnection { error, .. }| error)
.for_each(|(upgrade, addr)| {
// accept the request to be a ws connection if it does
let f = upgrade
.use_protocol("rust-websocket")
.accept()
.and_then(|(s, _)| {
let (sink, stream) = s.split();
rx
.forward(sink)
.and_then(|(a, sink)| {
sink.send(OwnedMessage::Close(None))
})
});
handle.spawn(f.map_err(move |e| println!("Err"))
.map(move |_| println!("Done")));
Ok(())
});
core.run(f).expect("somerror");
}
现在,还是有错误:
error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:43:31
|
30 | let rx = rx.map_err(|()| panic!("Receiver should never fail!"));
| -- captured outer variable
...
43 | .and_then(|(s, _)| {
| ^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure
为什么闭包试图移动 rx
?因为 forward
按值取 self
。为什么闭包是 FnMut
?注意,Future::and_then
requires an FnOnce
(it's valid to move a value from a captured variable into an FnOnce
closure), but Stream::for_each
需要 FnMut
。这是有道理的:for_each
将为每个传入连接调用一次闭包!
您正在使用的频道是 multi-producer, single-consumer(因此得名 mpsc),但您试图让多个消费者在这里(每个连接都试图从接收器读取)。我会把它留给你来解决你程序中的这个设计问题。请记住,可以有多个并发客户端连接!
我正在努力研究 Rust 中的期货,但我对这段代码感到困惑,该代码应该将到达 rx
的消息发送到 sink
:
extern crate futures;
extern crate tokio_core;
extern crate websocket;
use websocket::message::OwnedMessage;
use websocket::server::InvalidConnection;
use websocket::async::Server;
use tokio_core::reactor::Core;
use futures::{Future, Sink, Stream};
use futures::sync::mpsc;
use std::{thread, time};
use futures::sync::mpsc::Receiver;
fn main() {
let mut core = Core::new().unwrap();
let (mut tx, rx) = mpsc::channel(5);
thread::spawn(|| worker(rx));
let mut i = 0;
loop {
let res = tx.clone().send(OwnedMessage::Text(format!("Test {}", i)));
core.run(res);
i += 1;
let period = time::Duration::from_millis(200);
thread::sleep(period);
}
}
fn worker(rx: Receiver<OwnedMessage>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
// bind to the server
let server = Server::bind("127.0.0.1:9000", &handle).unwrap();
let f = server.incoming()
// we don't wanna save the stream if it drops
.map_err(|InvalidConnection { error, .. }| error)
.for_each(|(upgrade, addr)| {
// accept the request to be a ws connection if it does
let f = upgrade
.use_protocol("rust-websocket")
.accept()
.and_then(|(s, _)| {
let (sink, stream) = s.split();
rx // using stream (echoing back) works
.forward(sink)
.map_err(|error| {
error
})
.and_then(|(a, sink)| {
sink.send(OwnedMessage::Close(None))
})
});
handle.spawn(f.map_err(move |e| println!("Err"))
.map(move |_| println!("Done")));
Ok(())
});
core.run(f).expect("somerror");
}
如评论中所述,使用 stream
作为输入效果很好。使用 rx
时,编译器会抱怨有关错误类型的类型不匹配(我相信):
error[E0271]: type mismatch resolving `<futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>> as futures::Sink>::SinkError == ()`
--> src/main.rs:47:26
|
47 | .forward(sink)
| ^^^^^^^ expected enum `websocket::WebSocketError`, found ()
|
= note: expected type `websocket::WebSocketError`
found type `()`
error[E0599]: no method named `map_err` found for type `futures::stream::Forward<futures::sync::mpsc::Receiver<websocket::OwnedMessage>, futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>>` in the current scope
--> src/main.rs:48:26
|
48 | .map_err(|error| {
| ^^^^^^^
|
= note: the method `map_err` exists but the following trait bounds were not satisfied:
`futures::stream::Forward<futures::sync::mpsc::Receiver<websocket::OwnedMessage>, futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>> : futures::Future`
这些是我的依赖项:
[dependencies]
websocket = "0.20.0"
futures = "0.1"
tokio-core = "0.1"
我在这里错过了什么?
error[E0271]: type mismatch resolving `<futures::stream::SplitSink< websocket::client::async::Framed< tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>> as futures::Sink>::SinkError == ()`
我们这里有两种类型:<futures::stream::SplitSink<...> as futures::Sink>::SinkError
和()
。这两种类型从何而来?此外,第一个是未解析的关联类型;也许我们可以解决它以获得更多见解?一步步来追溯吧。
首先,我们需要弄清楚为什么编译器首先要尝试匹配这两种类型。如果我们查看 forward
的签名,我们将看到约束 Self::Error: From<S::SinkError>
。 Self
是我们调用 forward
的流的类型,而 S
是作为参数传递给 forward
.[=91= 的接收器的类型]
我们正在 rx
上调用 forward
,其类型为 futures::sync::mpsc::Receiver
。在 documentation page for Receiver
上,我们可以看到以下内容:
impl<T> Stream for Receiver<T> type Item = T type Error = ()
这向我们展示了 ()
的来源。现在让我们看看 sink
参数。
sink
的类型是 futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>
(我们从错误消息中知道这一点;RLS also confirms this). On the documentation page for SplitSink
,我们有:
impl<S: Sink> Sink for SplitSink<S> type SinkItem = S::SinkItem type SinkError = S::SinkError
因此 SplitSink
的 SinkError
与其内部水槽的 SinkError
相同。内部水槽的类型是 websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>
。 documentation for Framed
说什么?
impl<T, U> Sink for Framed<T, U> where T: AsyncWrite, U: Encoder, <U as Encoder>::Error: From<Error>, type SinkItem = <U as Encoder>::Item type SinkError = <U as Encoder>::Error
Framed
有两个类型参数,但是我们只需要看第二个,也就是这里的websocket::async::MessageCodec<websocket::OwnedMessage>
,就可以确定SinkError
类型。我们来看看 MessageCodec
now. (Note: websocket::codec::ws::MessageCodec
is reexported 为 websocket::async::MessageCodec
.)
impl<M> Decoder for MessageCodec<M> where M: MessageTrait, type Item = OwnedMessage type Error = WebSocketError
啊哈!接收器产生 WebSocketError
.
现在我们已经弄清楚了类型,让我们回到我们最初关心类型的原因。我们试图理解为什么在调用 forward
时未满足约束条件 Self::Error: From<S::SinkError>
。我们现在知道编译器正在尝试解析 (): From<WebSocketError>
。好像没有impl From<WebSocketError> for ()
。让我们验证一下:
extern crate websocket;
fn main() {
let a = websocket::result::WebSocketError::NoDataAvailable;
let () = From::from(a);
}
的确,编译失败:
error[E0277]: the trait bound `(): std::convert::From<websocket::WebSocketError>` is not satisfied
--> src/main.rs:5:14
|
5 | let () = From::from(a);
| ^^^^^^^^^^ the trait `std::convert::From<websocket::WebSocketError>` is not implemented for `()`
|
= note: required by `std::convert::From::from`
我们可以通过使用 sink_map_err
更改 sink
的错误类型来解决缺少的实现。
let (sink, stream) = s.split();
let sink = sink.sink_map_err(|_| ()); // <<<<<
rx
.forward(sink)
.and_then(|(a, sink)| {
sink.send(OwnedMessage::Close(None))
})
这解决了对 forward
的调用,但现在此闭包的结果不与 upgrade.use_protocol("rust-websocket").accept()
组合,它的错误类型仍然是 WebSocketError
。改为更改 rx
的错误类型更有意义。但是我们如何从不携带任何信息的 ()
构造一个 WebSocketError
呢?
您可能想知道,为什么 Receiver
使用 ()
作为其错误类型?如果我们查看 source code, we can see that in fact, poll
never returns an error. I think it would be more appropriate if the error type was !
(the never type) or some other void type, to clearly indicate that errors are impossible; there's an issue open on futures 要求对期货 0.2 进行此更改。
因为不可能出错,所以我们不需要构造一个WebSocketError
;我们可以改为分散,例如恐慌。
fn worker(rx: Receiver<OwnedMessage>) {
let rx = rx.map_err(|()| panic!("Receiver should never fail!"));
let mut core = Core::new().unwrap();
let handle = core.handle();
// bind to the server
let server = Server::bind("127.0.0.1:9000", &handle).unwrap();
let f = server.incoming()
// we don't wanna save the stream if it drops
.map_err(|InvalidConnection { error, .. }| error)
.for_each(|(upgrade, addr)| {
// accept the request to be a ws connection if it does
let f = upgrade
.use_protocol("rust-websocket")
.accept()
.and_then(|(s, _)| {
let (sink, stream) = s.split();
rx
.forward(sink)
.and_then(|(a, sink)| {
sink.send(OwnedMessage::Close(None))
})
});
handle.spawn(f.map_err(move |e| println!("Err"))
.map(move |_| println!("Done")));
Ok(())
});
core.run(f).expect("somerror");
}
现在,还是有错误:
error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:43:31
|
30 | let rx = rx.map_err(|()| panic!("Receiver should never fail!"));
| -- captured outer variable
...
43 | .and_then(|(s, _)| {
| ^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure
为什么闭包试图移动 rx
?因为 forward
按值取 self
。为什么闭包是 FnMut
?注意,Future::and_then
requires an FnOnce
(it's valid to move a value from a captured variable into an FnOnce
closure), but Stream::for_each
需要 FnMut
。这是有道理的:for_each
将为每个传入连接调用一次闭包!
您正在使用的频道是 multi-producer, single-consumer(因此得名 mpsc),但您试图让多个消费者在这里(每个连接都试图从接收器读取)。我会把它留给你来解决你程序中的这个设计问题。请记住,可以有多个并发客户端连接!