从命名管道连续读取

Read continuously from a named pipe

我想知道我还有哪些其他选项可以使用 golang 从命名管道连续读取。我当前的代码依赖于 gorutine 中的无限循环 运行;但 hat 将一个 CPU 保持在 100% 使用率。

func main() {
....

var wg sync.WaitGroup
fpipe, _ := os.OpenFile(namedPipe, os.O_RDONLY, 0600)
defer fpipe.Close()

f, _ := os.Create("dump.txt")
defer f.Close()
var buff bytes.Buffer

wg.Add(1)
go func() {
        for {
          io.Copy(&buff, fpipe)
          if buff.Len() > 0 {
              buff.WriteTo(f)
           }
         }
    }()

    wg.Wait()
}

命名管道 reader 将在没有写入者时收到 EOF。此代码之外的解决方案是确保始终有一个写入进程持有文件描述符,尽管它不需要写入任何内容。

在 Go 程序中,如果你想等待一个新的写入者,你将不得不在你的 for 循环中轮询 io.Reader。您当前的代码通过一个繁忙的循环来执行此操作,它将消耗 100% 的 1 cpu 核心。在其他错误上添加睡眠和方法 return 将解决此问题:

for {
    err := io.Copy(&buff, fpipe)
    if buff.Len() > 0 {
        buff.WriteTo(f)
    }

    if err != nil {
        // something other than EOF happened
        return
    }

    time.Sleep(100 * time.Millisecond)
}

简介

如前所述,如果没有写入者,命名管道 reader 将收到 EOF。

但是我发现@JimB 的解决方案不是最优的:

  1. 命名管道有一个最大容量(65kB,iirc),它很可能在 100 毫秒的睡眠周期内被填满。当缓冲区已满时,所有写入程序都会无缘无故地阻塞。
  2. 如果重新启动,您平均会丢失 50 毫秒的数据。同样,没有充分的理由。
  3. 如果要使用静态缓冲区进行复制,io.CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) would be the better solution, imho. But this is not even necessary, as io.Copy(或底层实现)实际上分配了32kB的缓冲区。

我的做法

更好的解决方案是等待写入发生并立即将命名管道的内容复制到目标文件。在大多数系统上,有某种关于文件系统事件的通知。包 github.com/rjeczalik/notify 可用于访问我们感兴趣的事件,因为写入事件在大多数重要操作系统上都可以跨平台工作。另一个我们感兴趣的事件是命名管道的删除,因为我们没有任何东西可以读取。

因此,我的解决方案是:

package main

import (
    "flag"
    "io"
    "log"
    "os"

    "github.com/rjeczalik/notify"
)

const (
    MAX_CONCURRENT_WRITERS = 5
)

var (
    pipePath string
    filePath string
)

func init() {
    flag.StringVar(&pipePath, "pipe", "", "/path/to/named_pipe to read from")
    flag.StringVar(&filePath, "file", "out.txt", "/path/to/output file")
    log.SetOutput(os.Stderr)
}

func main() {
    flag.Parse()

    var p, f *os.File
    var err error
    var e notify.EventInfo

    // The usual stuff: checking wether the named pipe exists etc
    if p, err = os.Open(pipePath); os.IsNotExist(err) {
        log.Fatalf("Named pipe '%s' does not exist", pipePath)
    } else if os.IsPermission(err) {
        log.Fatalf("Insufficient permissions to read named pipe '%s': %s", pipePath, err)
    } else if err != nil {
        log.Fatalf("Error while opening named pipe '%s': %s", pipePath, err)
    }
    // Yep, there and readable. Close the file handle on exit
    defer p.Close()

    // Do the same for the output file
    if f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600); os.IsNotExist(err) {
        log.Fatalf("File '%s' does not exist", filePath)
    } else if os.IsPermission(err) {
        log.Fatalf("Insufficient permissions to open/create file '%s' for appending: %s", filePath, err)
    } else if err != nil {
        log.Fatalf("Error while opening file '%s' for writing: %err", filePath, err)
    }
    // Again, close the filehandle on exit
    defer f.Close()

    // Here is where it happens. We create a buffered channel for events which might happen
    // on the file. The reason why we make it buffered to the number of expected concurrent writers
    // is that if all writers would (theoretically) write at once or at least pretty close
    // to each other, it might happen that we loose event. This is due to the underlying implementations
    // not because of go.
    c := make(chan notify.EventInfo, MAX_CONCURRENT_WRITERS)

    // Here we tell notify to watch out named pipe for events, Write and Remove events
    // specifically. We watch for remove events, too, as this removes the file handle we
    // read from, making reads impossible
    notify.Watch(pipePath, c, notify.Write|notify.Remove)

    // We start an infinite loop...
    for {
        // ...waiting for an event to be passed.
        e = <-c

        switch e.Event() {

        case notify.Write:
            // If it a a write event, we copy the content of the named pipe to
            // our output file and wait for the next event to happen.
            // Note that this is idempotent: Even if we have huge writes by multiple
            // writers on the named pipe, the first call to Copy will copy the contents.
            // The time to copy that data may well be longer than it takes to generate the events.
            // However, subsequent calls may copy nothing, but that does not do any harm.
            io.Copy(f, p)

        case notify.Remove:
            // Some user or process has removed the named pipe,
            // so we have nothing left to read from.
            // We should inform the user and quit.
            log.Fatalf("Named pipe '%s' was removed. Quitting", pipePath)
        }
    }
}

问题:当 'last writer' 关闭管道时,您会收到 EOF,即使稍后可能会出现新的编写器。

解决方法:自己打开管道写,不要关闭。现在,您可以将读取端视为永无止境的读取,而无需获得 EOF。将以下内容直接放在打开管道进行读取的位置之后:

nullWriter, err := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
  logger.Crit("Error opening pipe for (placeholder) write", "err", err)
}
defer nullWriter.Close()