如何通过 futures::Sink 发送项目列表?
How do I send a list of items through a futures::Sink?
我有一个要通过期货发送的项目列表::Sink
:
let mut list = VecDeque::new();
/* add a bunch of Packet items to list */
let (sink, stream) = tcp_stream.framed(PacketCodec).split();
我可以使用
发送一个数据包
if let Some(first) = list.pop_front() {
sink.send(first);
}
如何发送整个列表?
使用新软件时,我发现在深入研究之前先阅读文档非常有用。总的来说,Rust 社区提供了相当不错的资源。
例如,Tokio 项目有一个 entire site of documentation which includes a selection of working examples. The generated API documentation for Sink
在这里也是无价的。
我建议所有程序员学会做的另一件事是创建一个 MCVE 问题。这使他们能够专注于问题的核心,同时消除问题并磨练问题背后的文字。这是这种情况的一个:
use futures::{Sink, SinkExt}; // 0.3.4
fn thing(mut sink: impl Sink<i32>) {
let mut all_the_things = vec![1, 2, 3, 4, 5];
while let Some(v) = all_the_things.pop() {
sink.send(v);
}
}
我们收到错误消息
error[E0277]: the trait bound `impl Sink<i32>: std::marker::Unpin` is not satisfied
--> src/lib.rs:7:14
|
3 | fn thing(mut sink: impl Sink<i32>) {
| -------------- help: consider further restricting this bound: `impl Sink<i32> + std::marker::Unpin`
...
7 | sink.send(v);
| ^^^^ the trait `std::marker::Unpin` is not implemented for `impl Sink<i32>`
让我们返回 API 文档以获取 SinkExt::send
...
fn send(&mut self, item: Item) -> Send<Self, Item>
where
Self: Unpin,
由此可见,send
需要类型实现Unpin
,所以我们改一下:
use futures::{Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;
fn thing(mut sink: impl Sink<i32> + Unpin) {
let mut all_the_things = vec![1, 2, 3, 4, 5];
while let Some(v) = all_the_things.pop() {
sink.send(v);
}
}
这可以编译,但有一个警告:
warning: unused `futures_util::sink::send::Send` that must be used
--> src/lib.rs:8:9
|
8 | sink.send(v);
| ^^^^^^^^^^^^^
|
= note: `#[warn(unused_must_use)]` on by default
= note: futures do nothing unless you `.await` or poll them
再次查看 API,我们看到它 returns 一个 Send
值。这很奇怪,不是吗?实际上 并不奇怪,一旦您看到 Send
实现了 Future
- 如果流已满,将某些内容推入流可能会阻塞;这就是背压的概念。将结果设为 Future
是您可以知道项目何时实际添加到流中的方式。
一个解决方案是用 .await
驱动那个未来完成并制定函数 async
:
use futures::{Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;
async fn thing(mut sink: impl Sink<i32> + Unpin) {
let mut all_the_things = vec![1, 2, 3, 4, 5];
while let Some(v) = all_the_things.pop() {
sink.send(v)
.await
.unwrap_or_else(|_| panic!("Unable to send item"));
}
}
这可以编译,但不是很漂亮。回到文档,我们还可以看到 Sink::send_all
, which takes a TryStream
. We can create a Stream
from an iterator by using stream::iter
并驱动 send_all
返回的 future 完成:
use futures::{stream, Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;
async fn thing(mut sink: impl Sink<i32> + Unpin) {
let all_the_things = vec![1, 2, 3, 4, 5];
let mut stream = stream::iter(all_the_things.into_iter().map(Ok));
sink.send_all(&mut stream)
.await
.unwrap_or_else(|_| panic!("Unable to send item"));
}
我有一个要通过期货发送的项目列表::Sink
:
let mut list = VecDeque::new();
/* add a bunch of Packet items to list */
let (sink, stream) = tcp_stream.framed(PacketCodec).split();
我可以使用
发送一个数据包if let Some(first) = list.pop_front() {
sink.send(first);
}
如何发送整个列表?
使用新软件时,我发现在深入研究之前先阅读文档非常有用。总的来说,Rust 社区提供了相当不错的资源。
例如,Tokio 项目有一个 entire site of documentation which includes a selection of working examples. The generated API documentation for Sink
在这里也是无价的。
我建议所有程序员学会做的另一件事是创建一个 MCVE 问题。这使他们能够专注于问题的核心,同时消除问题并磨练问题背后的文字。这是这种情况的一个:
use futures::{Sink, SinkExt}; // 0.3.4
fn thing(mut sink: impl Sink<i32>) {
let mut all_the_things = vec![1, 2, 3, 4, 5];
while let Some(v) = all_the_things.pop() {
sink.send(v);
}
}
我们收到错误消息
error[E0277]: the trait bound `impl Sink<i32>: std::marker::Unpin` is not satisfied
--> src/lib.rs:7:14
|
3 | fn thing(mut sink: impl Sink<i32>) {
| -------------- help: consider further restricting this bound: `impl Sink<i32> + std::marker::Unpin`
...
7 | sink.send(v);
| ^^^^ the trait `std::marker::Unpin` is not implemented for `impl Sink<i32>`
让我们返回 API 文档以获取 SinkExt::send
...
fn send(&mut self, item: Item) -> Send<Self, Item>
where
Self: Unpin,
由此可见,send
需要类型实现Unpin
,所以我们改一下:
use futures::{Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;
fn thing(mut sink: impl Sink<i32> + Unpin) {
let mut all_the_things = vec![1, 2, 3, 4, 5];
while let Some(v) = all_the_things.pop() {
sink.send(v);
}
}
这可以编译,但有一个警告:
warning: unused `futures_util::sink::send::Send` that must be used
--> src/lib.rs:8:9
|
8 | sink.send(v);
| ^^^^^^^^^^^^^
|
= note: `#[warn(unused_must_use)]` on by default
= note: futures do nothing unless you `.await` or poll them
再次查看 API,我们看到它 returns 一个 Send
值。这很奇怪,不是吗?实际上 并不奇怪,一旦您看到 Send
实现了 Future
- 如果流已满,将某些内容推入流可能会阻塞;这就是背压的概念。将结果设为 Future
是您可以知道项目何时实际添加到流中的方式。
一个解决方案是用 .await
驱动那个未来完成并制定函数 async
:
use futures::{Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;
async fn thing(mut sink: impl Sink<i32> + Unpin) {
let mut all_the_things = vec![1, 2, 3, 4, 5];
while let Some(v) = all_the_things.pop() {
sink.send(v)
.await
.unwrap_or_else(|_| panic!("Unable to send item"));
}
}
这可以编译,但不是很漂亮。回到文档,我们还可以看到 Sink::send_all
, which takes a TryStream
. We can create a Stream
from an iterator by using stream::iter
并驱动 send_all
返回的 future 完成:
use futures::{stream, Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;
async fn thing(mut sink: impl Sink<i32> + Unpin) {
let all_the_things = vec![1, 2, 3, 4, 5];
let mut stream = stream::iter(all_the_things.into_iter().map(Ok));
sink.send_all(&mut stream)
.await
.unwrap_or_else(|_| panic!("Unable to send item"));
}