一个go中出错关闭多个goroutine

Close multiple goroutine if an error occurs in one in go

考虑这个功能:

func doAllWork() error {

    var wg sync.WaitGroup

    for i := 0; i < 2; i++ {

        wg.add(1)
        go func() {

            defer wg.Done()
            for j := 0; j < 10; j++ {
                result, err := work(j)
                if err != nil {
                    // can't use `return err` here
                    // what sould I put instead ? 
                    os.Exit(0)
                }
            }
        }()
    }
    wg.Wait()

    return nil
}

在每个goroutine中,函数work()被调用了10次。如果对 work() returns 的调用在任何 运行 goroutines 中出错,我希望所有 goroutines 立即停止,并退出程序。 可以在这里使用 os.Exit() 吗?我该如何处理?


编辑:这个问题与how to stop a goroutine不同,因为这里我需要在一个

发生错误时关闭所有goroutines

您可以使用为这样的事情创建的 context 包 ("carries deadlines, cancelation signals...")。

您创建了一个能够使用 context.WithCancel() (parent context may be the one returned by context.Background()) 发布取消信号的上下文。这将为您 return 您提供一个 cancel() 函数,该函数可用于取消(或更准确地说 信号 取消意图)到 worker goroutines。
在 worker goroutines 中,你必须通过检查由 Context.Done() 编辑的通道 return 是否关闭来检查这种意图是否已经启动,最简单的方法是尝试从它接收(如果它立即进行已关闭)。并进行非阻塞检查(如果它未关闭,您可以继续),请使用 select 语句和 default 分支。

我将使用以下 work() 实现,它模拟 10% 的失败几率,并模拟 1 秒的工作:

func work(i int) (int, error) {
    if rand.Intn(100) < 10 { // 10% of failure
        return 0, errors.New("random error")
    }
    time.Sleep(time.Second)
    return 100 + i, nil
}

doAllWork() 可能如下所示:

func doAllWork() error {
    var wg sync.WaitGroup

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // Make sure it's called to release resources even if no errors

    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()

            for j := 0; j < 10; j++ {
                // Check if any error occurred in any other gorouties:
                select {
                case <-ctx.Done():
                    return // Error somewhere, terminate
                default: // Default is must to avoid blocking
                }
                result, err := work(j)
                if err != nil {
                    fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
                    cancel()
                    return
                }
                fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
            }
        }(i)
    }
    wg.Wait()

    return ctx.Err()
}

测试方法如下:

func main() {
    rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
    fmt.Printf("doAllWork: %v\n", doAllWork())
}

输出(在 Go Playground 上尝试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled

如果没有错误,例如使用以下 work() 函数时:

func work(i int) (int, error) {
    time.Sleep(time.Second)
    return 100 + i, nil
}

输出就像(在 Go Playground 上尝试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>

备注:

基本上我们只是使用上下文的 Done() 通道,所以看起来我们可以很容易地(如果不是更容易的话)使用 done 通道而不是 Context , 关闭通道以执行 cancel() 在上述解决方案中所做的事情。

这不是真的。 这只能在只有一个 goroutine 可以关闭通道的情况下使用,但在我们的例子中,任何一个 worker 都可以这样做。并且试图关闭一个已经关闭的通道会出现恐慌(详情请见此处: )。因此,您必须确保围绕 close(done) 进行某种同步/排除,这将降低它的可读性甚至更复杂。实际上,这正是 cancel() 函数在幕后所做的,隐藏/抽象远离您的眼睛,因此 cancel() 可能会被多次调用以使您的代码/使用更简单。

如何从工作人员处获取并return错误?

为此您可以使用错误通道:

errs := make(chan error, 2) // Buffer for 2 errors

并且在 workers 内部遇到错误时,将其发送到通道而不是打印出来:

result, err := work(j)
if err != nil {
    errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
    cancel()
    return
}

并且在循环之后,如果出现错误,return(否则 nil):

// Return (first) error, if any:
if ctx.Err() != nil {
    return <-errs
}
return nil

这次输出(在Go Playground上试试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error

请注意,我使用了一个缓冲区大小等于工作人员数量的缓冲通道,这确保了在其上发送始终是非阻塞的。这也使您有可能接收和处理所有错误,而不仅仅是一个(例如第一个)。另一种选择是使用缓冲通道仅容纳 1,并对其进行非阻塞发送,如下所示:

errs := make(chan error, 1) // Buffered only for the first error

// ...and inside the worker:

result, err := work(j)
if err != nil {
    // Non-blocking send:
    select {
    case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
    default:
    }
    cancel()
    return
}

另一种方法是使用 errgroup.WithContext。你可以在这个example.

中查看

简而言之,g.Wait() 等待第一个错误发生或所有错误完成。当任何 goroutines 发生错误时(在提供的示例中超时),它会通过 ctx.Done() 通道取消其他 goroutines 中的执行。

更清晰的方法是使用 errgroup (documentation).

errgroup 为处理共同任务的子任务的 goroutine 组提供同步、错误传播和上下文取消。

你可以看看这个例子(playground):

    var g errgroup.Group
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }

    for _, url := range urls {
        // Launch a goroutine to fetch the URL.
        url := url // https://golang.org/doc/faq#closures_and_goroutines
        
       g.Go(func() error {
            // Fetch the URL.
            resp, err := http.Get(url)
            if err == nil {
                resp.Body.Close()
            }
            return err
        })
    }
   
    // Wait for all HTTP fetches to complete.
    if err := g.Wait(); err == nil {
        fmt.Println("Successfully fetched all URLs.")
    
    } else {

        // After all have run, at least one of them has returned an error!
       // But all have to finish their work!
       // If you want to stop others goroutines when one fail, go ahead reading!
        fmt.Println("Unsuccessfully fetched URLs.")
    }

但注意:Go documentation 中的 The first call to return a non-nil error cancels the group 短语有点误导。

其实errgroup.Group 如果创建了一个context(WithContext函数),会调用context的cancel函数return当组中的 goroutine 将 return 出错时,由 WithContext 编辑,否则什么都不做 (read the source code here!).

所以,如果你想关闭不同的 goroutines,你必须使用上下文 returned my WithContext 并在它们内部自行管理,errgroup 将关闭它语境! Here you can find an example.

总而言之,errgroup 可以以不同的方式使用,如 examples 所示。

  1. “只是错误”,如上例: Wait 等待所有 goroutines 结束,然后 return 发送第一个非零错误(如果有的话),或者 return nil.

  2. 并行: 您必须使用 WithContext function 创建组并使用上下文来管理上下文关闭。 I created a playground example here with some sleeps! 您必须手动关闭每个 goroutine,但是使用上下文,您可以在关闭上下文时结束它们。

  3. 管道(在 examples 中查看更多内容)。