如何通过标准输入实现阻塞迭代器?

How to implement blocking iterator over stdin?

我需要实现一个长运行ning 程序,它通过标准输入接收消息。该协议定义消息的形式为长度指示符(为简单起见,1 字节整数),然后是长度指示符表示的长度字符串。消息不由任何空格分隔。 该程序应使用来自 stdin 的所有消息并等待另一条消息。

如何在标准输入上实现这种等待?

我以一种尝试从 stdin 读取并在出现错误时重复的方式实现了迭代器。它有效,但效率很低。 我希望迭代器在新数据到来时读取消息。

我的实现使用 read_exact:

use std::io::{Read, stdin, Error as IOError, ErrorKind};

pub struct In<R>(R) where R: Read;

pub trait InStream{
    fn read_one(&mut self) -> Result<String, IOError>;
}

impl <R>In<R> where R: Read{
    pub fn new(stdin: R) -> In<R> {
        In(stdin)
    }
}

impl <R>InStream for In<R> where R: Read{
    /// Read one message from stdin and return it as string
    fn read_one(&mut self) -> Result<String, IOError>{

        const length_indicator: usize = 1;
        let stdin = &mut self.0;

        let mut size: [u8;length_indicator] = [0; length_indicator];
        stdin.read_exact(&mut size)?;
        let size = u8::from_be_bytes(size) as usize;

        let mut buffer = vec![0u8; size];
        let _bytes_read = stdin.read_exact(&mut buffer);
        String::from_utf8(buffer).map_err(|_| IOError::new(ErrorKind::InvalidData, "not utf8"))
    }
}
impl <R>Iterator for In<R> where R:Read{
    type Item = String;
    fn next(&mut self) -> Option<String>{
        self.read_one()
            .ok()
    }
}

fn main(){
    let mut in_stream = In::new(stdin());
    loop{
        match in_stream.next(){
            Some(x) => println!("x: {:?}", x),
            None => (),
        }
    }
}

我查看了 Read 和 BufReader 文档,但是 none 方法似乎解决了我的问题,因为 read 文档包含以下文本:

This function does not provide any guarantees about whether it blocks waiting for data, but if an object needs to block for a read and cannot, it will typically signal this via an Err return value.

如何实现在标准输入上等待数据?

===

编辑:不阻塞和循环给出 UnexpectedEof 错误而不是等待数据的最小用例:

use std::io::{Read, stdin};
fn main(){
    let mut stdin = stdin();
    let mut stdin_handle = stdin.lock();
    loop{
        let mut buffer = vec![0u8; 4];
        let res = stdin_handle.read_exact(&mut buffer);
        println!("res: {:?}", res);
        println!("buffer: {:?}", buffer);
    }

我 运行 它在 OSX 上 cargo run < in 其中 in 是命名管道。我用 echo -n "1234" > in.

填充管道

它等待第一个输入,然后循环。

res: Ok(())
buffer: [49, 50, 51, 52]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
...

我希望程序等到有足够的数据来填充缓冲区。

正如其他人所解释的那样,Read 上的文档写得非常笼统,不适用于标准输入, 阻塞。换句话说,您添加了缓冲的代码没问题。

问题是你如何使用管道。例如,如果您在一个 shell 中 运行 mkfifo foo; cat <foo,在另一个 echo -n bla >foo 中,您会看到第一个 shell 中的 cat ] 将显示 foo 并退出。关闭管道的最后一个编写器会将 EOF 发送到 reader,使程序的 stdin 无用。

您可以通过在后台启动另一个程序来解决此问题,该程序以写入模式打开管道并且永不退出,例如 tail -f /dev/null >pipe-filename。然后 echo -n bla >foo 将被您的程序观察到,但不会导致其标准输入关闭。管道写入端的“保持”也可以通过 Rust 实现。