流式命令从 Goroutine 输出进度

Streaming commands output progress from Goroutine

问题解决了长运行ning命令的打印进度问题。

我试图将打印代码放在一个 goroutine 中,但扫描器声称已经立即命中 EOF 并且永远不会执行 for 块。

第一次执行 Scan() 方法时执行的 bufio.scan 代码是:

    // We cannot generate a token with what we are holding.
    // If we've already hit EOF or an I/O error, we are done.
    if s.err != nil {
        // Shut it down.
        s.start = 0
        s.end = 0
        return false
    }

如果我打印 s.err 输出是 EOF.

我尝试 运行 的代码是:

cmd := exec.Command("some", "command")
c := make(chan int, 1)

go func(cmd *exec.Cmd, c chan int) {
    stdout, _ := cmd.StdoutPipe()

    <-c

    scanner := bufio.NewScanner(stdout)
    for scanner.Scan() {
        m := scanner.Text()
        fmt.Println(m)
    }
}(cmd, c)

cmd.Start()

c <- 1

cmd.Wait()

想法是启动 Goroutine,获取 cmd.stdout,等待 cmd 启动,然后开始处理它的输出。

结果是执行了长命令,程序等待其完成,但没有任何内容打印到终端。

知道为什么第一次调用 scanner.Scan()stdout 已经达到 EOF 了吗?

存在一些问题:

  • 在读取所有数据之前正在关闭管道。
  • 始终检查错误
  • c <- struct{}{} 之后开始 cmd.Start() 并使用无缓冲通道 c := make(chan struct{})

两个工作示例代码:

1:等待使用通道,然后在使用 defer func() { c <- struct{}{} }() EOF 之后关闭管道,就像这样的工作示例代码:

package main

import (
    "bufio"
    "fmt"
    "os/exec"
)

func main() {
    cmd := exec.Command("Streamer")
    c := make(chan struct{})

    go run(cmd, c)

    c <- struct{}{}
    cmd.Start()

    <-c
    if err := cmd.Wait(); err != nil {
        fmt.Println(err)
    }
    fmt.Println("done.")
}

func run(cmd *exec.Cmd, c chan struct{}) {
    defer func() { c <- struct{}{} }()
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        panic(err)
    }
    <-c
    scanner := bufio.NewScanner(stdout)
    for scanner.Scan() {
        m := scanner.Text()
        fmt.Println(m)
    }
    fmt.Println("EOF")
}

2:您也可以等待使用 sync.WaitGroup,就像这个工作示例代码:

package main

import (
    "bufio"
    "fmt"
    "os/exec"
    "sync"
)

var wg sync.WaitGroup

func main() {
    cmd := exec.Command("Streamer")
    c := make(chan struct{})
    wg.Add(1)
    go func(cmd *exec.Cmd, c chan struct{}) {
        defer wg.Done()
        stdout, err := cmd.StdoutPipe()
        if err != nil {
            panic(err)
        }
        <-c
        scanner := bufio.NewScanner(stdout)
        for scanner.Scan() {
            m := scanner.Text()
            fmt.Println(m)
        }
    }(cmd, c)

    c <- struct{}{}
    cmd.Start()

    wg.Wait()
    fmt.Println("done.")
}

和Streamer示例代码(仅供测试):

package main

import "fmt"
import "time"

func main() {
    for i := 0; i < 10; i++ {
        time.Sleep(1 * time.Second)
        fmt.Println(i, ":", time.Now().UTC())
    }
}

并查看 func (c *Cmd) StdoutPipe() (io.ReadCloser, error) 文档:

StdoutPipe returns a pipe that will be connected to the command's standard output when the command starts.

Wait will close the pipe after seeing the command exit, so most callers need not close the pipe themselves; however, an implication is that it is incorrect to call Wait before all reads from the pipe have completed. For the same reason, it is incorrect to call Run when using StdoutPipe. See the example for idiomatic usage.

来自 godocs:

StdoutPipe returns a pipe that will be connected to the command's standard output when the command starts.

Wait will close the pipe after seeing the command exit, so most callers need not close the pipe themselves; however, an implication is that it is incorrect to call Wait before all reads from the pipe have completed.

您在启动命令后立即调用 Wait()。因此,在确保您已从管道中读取所有数据之前,管道会在命令完成后立即关闭。尝试在扫描循环后将 Wait() 移至您的 go 例程。

go func(cmd *exec.Cmd, c chan int) {
    stdout, _ := cmd.StdoutPipe()

    <-c

    scanner := bufio.NewScanner(stdout)
    for scanner.Scan() {
        m := scanner.Text()
        fmt.Println(m)
    }

    cmd.Wait()
    c <- 1
}(cmd, c)

cmd.Start()
c <- 1

// This is here so we don't exit the program early,
<-c

还有一个更简单的方法,就是把os.stdout指定为cmd的stdout,让命令直接写入os.stdout:

cmd := exec.Command("some", "command")
cmd.Stdout = os.Stdout
cmd.Run()