如何通过 futures::Sink 发送项目列表?

How do I send a list of items through a futures::Sink?

我有一个要通过期货发送的项目列表::Sink:

let mut list = VecDeque::new();
/* add a bunch of Packet items to list */
let (sink, stream) = tcp_stream.framed(PacketCodec).split();

我可以使用

发送一个数据包
if let Some(first) = list.pop_front() {
    sink.send(first);
}

如何发送整个列表?

使用新软件时,我发现在深入研究之前先阅读文档非常有用。总的来说,Rust 社区提供了相当不错的资源。

例如,Tokio 项目有一个 entire site of documentation which includes a selection of working examples. The generated API documentation for Sink 在这里也是无价的。

我建议所有程序员学会做的另一件事是创建一个 MCVE 问题。这使他们能够专注于问题的核心,同时消除问题并磨练问题背后的文字。这是这种情况的一个:

use futures::{Sink, SinkExt}; // 0.3.4

fn thing(mut sink: impl Sink<i32>) {
    let mut all_the_things = vec![1, 2, 3, 4, 5];

    while let Some(v) = all_the_things.pop() {
        sink.send(v);
    }
}

我们收到错误消息

error[E0277]: the trait bound `impl Sink<i32>: std::marker::Unpin` is not satisfied
 --> src/lib.rs:7:14
  |
3 | fn thing(mut sink: impl Sink<i32>) {
  |                    -------------- help: consider further restricting this bound: `impl Sink<i32> + std::marker::Unpin`
...
7 |         sink.send(v);
  |              ^^^^ the trait `std::marker::Unpin` is not implemented for `impl Sink<i32>`

让我们返回 API 文档以获取 SinkExt::send...

fn send(&mut self, item: Item) -> Send<Self, Item>
where
    Self: Unpin, 

由此可见,send需要类型实现Unpin,所以我们改一下:

use futures::{Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;

fn thing(mut sink: impl Sink<i32> + Unpin) {
    let mut all_the_things = vec![1, 2, 3, 4, 5];

    while let Some(v) = all_the_things.pop() {
        sink.send(v);
    }
}

这可以编译,但有一个警告:

warning: unused `futures_util::sink::send::Send` that must be used
 --> src/lib.rs:8:9
  |
8 |         sink.send(v);
  |         ^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_must_use)]` on by default
  = note: futures do nothing unless you `.await` or poll them

再次查看 API,我们看到它 returns 一个 Send 值。这很奇怪,不是吗?实际上 并不奇怪,一旦您看到 Send 实现了 Future - 如果流已满,将某些内容推入流可能会阻塞;这就是背压的概念。将结果设为 Future 是您可以知道项目何时实际添加到流中的方式。

一个解决方案是用 .await 驱动那个未来完成并制定函数 async:

use futures::{Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;

async fn thing(mut sink: impl Sink<i32> + Unpin) {
    let mut all_the_things = vec![1, 2, 3, 4, 5];

    while let Some(v) = all_the_things.pop() {
        sink.send(v)
            .await
            .unwrap_or_else(|_| panic!("Unable to send item"));
    }
}

这可以编译,但不是很漂亮。回到文档,我们还可以看到 Sink::send_all, which takes a TryStream. We can create a Stream from an iterator by using stream::iter 并驱动 send_all 返回的 future 完成:

use futures::{stream, Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;

async fn thing(mut sink: impl Sink<i32> + Unpin) {
    let all_the_things = vec![1, 2, 3, 4, 5];
    let mut stream = stream::iter(all_the_things.into_iter().map(Ok));

    sink.send_all(&mut stream)
        .await
        .unwrap_or_else(|_| panic!("Unable to send item"));
}