一个go中出错关闭多个goroutine
Close multiple goroutine if an error occurs in one in go
考虑这个功能:
func doAllWork() error {
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.add(1)
go func() {
defer wg.Done()
for j := 0; j < 10; j++ {
result, err := work(j)
if err != nil {
// can't use `return err` here
// what sould I put instead ?
os.Exit(0)
}
}
}()
}
wg.Wait()
return nil
}
在每个goroutine中,函数work()
被调用了10次。如果对 work()
returns 的调用在任何 运行 goroutines 中出错,我希望所有 goroutines 立即停止,并退出程序。
可以在这里使用 os.Exit()
吗?我该如何处理?
编辑:这个问题与how to stop a goroutine不同,因为这里我需要在一个
发生错误时关闭所有goroutines
您可以使用为这样的事情创建的 context
包 ("carries deadlines, cancelation signals...")。
您创建了一个能够使用 context.WithCancel()
(parent context may be the one returned by context.Background()
) 发布取消信号的上下文。这将为您 return 您提供一个 cancel()
函数,该函数可用于取消(或更准确地说 信号 取消意图)到 worker goroutines。
在 worker goroutines 中,你必须通过检查由 Context.Done()
编辑的通道 return 是否关闭来检查这种意图是否已经启动,最简单的方法是尝试从它接收(如果它立即进行已关闭)。并进行非阻塞检查(如果它未关闭,您可以继续),请使用 select
语句和 default
分支。
我将使用以下 work()
实现,它模拟 10% 的失败几率,并模拟 1 秒的工作:
func work(i int) (int, error) {
if rand.Intn(100) < 10 { // 10% of failure
return 0, errors.New("random error")
}
time.Sleep(time.Second)
return 100 + i, nil
}
而 doAllWork()
可能如下所示:
func doAllWork() error {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
// Check if any error occurred in any other gorouties:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
result, err := work(j)
if err != nil {
fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
}
}(i)
}
wg.Wait()
return ctx.Err()
}
测试方法如下:
func main() {
rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
fmt.Printf("doAllWork: %v\n", doAllWork())
}
输出(在 Go Playground 上尝试):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled
如果没有错误,例如使用以下 work()
函数时:
func work(i int) (int, error) {
time.Sleep(time.Second)
return 100 + i, nil
}
输出就像(在 Go Playground 上尝试):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>
备注:
基本上我们只是使用上下文的 Done()
通道,所以看起来我们可以很容易地(如果不是更容易的话)使用 done
通道而不是 Context
, 关闭通道以执行 cancel()
在上述解决方案中所做的事情。
这不是真的。 这只能在只有一个 goroutine 可以关闭通道的情况下使用,但在我们的例子中,任何一个 worker 都可以这样做。并且试图关闭一个已经关闭的通道会出现恐慌(详情请见此处: )。因此,您必须确保围绕 close(done)
进行某种同步/排除,这将降低它的可读性甚至更复杂。实际上,这正是 cancel()
函数在幕后所做的,隐藏/抽象远离您的眼睛,因此 cancel()
可能会被多次调用以使您的代码/使用更简单。
如何从工作人员处获取并return错误?
为此您可以使用错误通道:
errs := make(chan error, 2) // Buffer for 2 errors
并且在 workers 内部遇到错误时,将其发送到通道而不是打印出来:
result, err := work(j)
if err != nil {
errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
并且在循环之后,如果出现错误,return(否则 nil
):
// Return (first) error, if any:
if ctx.Err() != nil {
return <-errs
}
return nil
这次输出(在Go Playground上试试):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error
请注意,我使用了一个缓冲区大小等于工作人员数量的缓冲通道,这确保了在其上发送始终是非阻塞的。这也使您有可能接收和处理所有错误,而不仅仅是一个(例如第一个)。另一种选择是使用缓冲通道仅容纳 1,并对其进行非阻塞发送,如下所示:
errs := make(chan error, 1) // Buffered only for the first error
// ...and inside the worker:
result, err := work(j)
if err != nil {
// Non-blocking send:
select {
case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
default:
}
cancel()
return
}
另一种方法是使用 errgroup.WithContext
。你可以在这个example.
中查看
简而言之,g.Wait()
等待第一个错误发生或所有错误完成。当任何 goroutines 发生错误时(在提供的示例中超时),它会通过 ctx.Done()
通道取消其他 goroutines 中的执行。
更清晰的方法是使用 errgroup
(documentation).
包 errgroup
为处理共同任务的子任务的 goroutine 组提供同步、错误传播和上下文取消。
你可以看看这个例子(playground):
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Launch a goroutine to fetch the URL.
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
// Fetch the URL.
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
} else {
// After all have run, at least one of them has returned an error!
// But all have to finish their work!
// If you want to stop others goroutines when one fail, go ahead reading!
fmt.Println("Unsuccessfully fetched URLs.")
}
但注意:Go
documentation 中的 The first call to return a non-nil error cancels the group
短语有点误导。
其实errgroup.Group
如果创建了一个context(WithContext
函数),会调用context的cancel函数return当组中的 goroutine 将 return 出错时,由 WithContext
编辑,否则什么都不做 (read the source code here!).
所以,如果你想关闭不同的 goroutines,你必须使用上下文 returned my WithContext
并在它们内部自行管理,errgroup
将关闭它语境!
Here you can find an example.
总而言之,errgroup
可以以不同的方式使用,如 examples 所示。
“只是错误”,如上例:
Wait
等待所有 goroutines 结束,然后 return 发送第一个非零错误(如果有的话),或者 return nil
.
并行:
您必须使用 WithContext
function 创建组并使用上下文来管理上下文关闭。
I created a playground example here with some sleeps!
您必须手动关闭每个 goroutine,但是使用上下文,您可以在关闭上下文时结束它们。
管道(在 examples 中查看更多内容)。
考虑这个功能:
func doAllWork() error {
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.add(1)
go func() {
defer wg.Done()
for j := 0; j < 10; j++ {
result, err := work(j)
if err != nil {
// can't use `return err` here
// what sould I put instead ?
os.Exit(0)
}
}
}()
}
wg.Wait()
return nil
}
在每个goroutine中,函数work()
被调用了10次。如果对 work()
returns 的调用在任何 运行 goroutines 中出错,我希望所有 goroutines 立即停止,并退出程序。
可以在这里使用 os.Exit()
吗?我该如何处理?
编辑:这个问题与how to stop a goroutine不同,因为这里我需要在一个
发生错误时关闭所有goroutines您可以使用为这样的事情创建的 context
包 ("carries deadlines, cancelation signals...")。
您创建了一个能够使用 context.WithCancel()
(parent context may be the one returned by context.Background()
) 发布取消信号的上下文。这将为您 return 您提供一个 cancel()
函数,该函数可用于取消(或更准确地说 信号 取消意图)到 worker goroutines。
在 worker goroutines 中,你必须通过检查由 Context.Done()
编辑的通道 return 是否关闭来检查这种意图是否已经启动,最简单的方法是尝试从它接收(如果它立即进行已关闭)。并进行非阻塞检查(如果它未关闭,您可以继续),请使用 select
语句和 default
分支。
我将使用以下 work()
实现,它模拟 10% 的失败几率,并模拟 1 秒的工作:
func work(i int) (int, error) {
if rand.Intn(100) < 10 { // 10% of failure
return 0, errors.New("random error")
}
time.Sleep(time.Second)
return 100 + i, nil
}
而 doAllWork()
可能如下所示:
func doAllWork() error {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
// Check if any error occurred in any other gorouties:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
result, err := work(j)
if err != nil {
fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
}
}(i)
}
wg.Wait()
return ctx.Err()
}
测试方法如下:
func main() {
rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
fmt.Printf("doAllWork: %v\n", doAllWork())
}
输出(在 Go Playground 上尝试):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled
如果没有错误,例如使用以下 work()
函数时:
func work(i int) (int, error) {
time.Sleep(time.Second)
return 100 + i, nil
}
输出就像(在 Go Playground 上尝试):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>
备注:
基本上我们只是使用上下文的 Done()
通道,所以看起来我们可以很容易地(如果不是更容易的话)使用 done
通道而不是 Context
, 关闭通道以执行 cancel()
在上述解决方案中所做的事情。
这不是真的。 这只能在只有一个 goroutine 可以关闭通道的情况下使用,但在我们的例子中,任何一个 worker 都可以这样做。并且试图关闭一个已经关闭的通道会出现恐慌(详情请见此处: close(done)
进行某种同步/排除,这将降低它的可读性甚至更复杂。实际上,这正是 cancel()
函数在幕后所做的,隐藏/抽象远离您的眼睛,因此 cancel()
可能会被多次调用以使您的代码/使用更简单。
如何从工作人员处获取并return错误?
为此您可以使用错误通道:
errs := make(chan error, 2) // Buffer for 2 errors
并且在 workers 内部遇到错误时,将其发送到通道而不是打印出来:
result, err := work(j)
if err != nil {
errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
并且在循环之后,如果出现错误,return(否则 nil
):
// Return (first) error, if any:
if ctx.Err() != nil {
return <-errs
}
return nil
这次输出(在Go Playground上试试):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error
请注意,我使用了一个缓冲区大小等于工作人员数量的缓冲通道,这确保了在其上发送始终是非阻塞的。这也使您有可能接收和处理所有错误,而不仅仅是一个(例如第一个)。另一种选择是使用缓冲通道仅容纳 1,并对其进行非阻塞发送,如下所示:
errs := make(chan error, 1) // Buffered only for the first error
// ...and inside the worker:
result, err := work(j)
if err != nil {
// Non-blocking send:
select {
case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
default:
}
cancel()
return
}
另一种方法是使用 errgroup.WithContext
。你可以在这个example.
简而言之,g.Wait()
等待第一个错误发生或所有错误完成。当任何 goroutines 发生错误时(在提供的示例中超时),它会通过 ctx.Done()
通道取消其他 goroutines 中的执行。
更清晰的方法是使用 errgroup
(documentation).
包 errgroup
为处理共同任务的子任务的 goroutine 组提供同步、错误传播和上下文取消。
你可以看看这个例子(playground):
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Launch a goroutine to fetch the URL.
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
// Fetch the URL.
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
} else {
// After all have run, at least one of them has returned an error!
// But all have to finish their work!
// If you want to stop others goroutines when one fail, go ahead reading!
fmt.Println("Unsuccessfully fetched URLs.")
}
但注意:Go
documentation 中的 The first call to return a non-nil error cancels the group
短语有点误导。
其实errgroup.Group
如果创建了一个context(WithContext
函数),会调用context的cancel函数return当组中的 goroutine 将 return 出错时,由 WithContext
编辑,否则什么都不做 (read the source code here!).
所以,如果你想关闭不同的 goroutines,你必须使用上下文 returned my WithContext
并在它们内部自行管理,errgroup
将关闭它语境!
Here you can find an example.
总而言之,errgroup
可以以不同的方式使用,如 examples 所示。
“只是错误”,如上例:
Wait
等待所有 goroutines 结束,然后 return 发送第一个非零错误(如果有的话),或者 returnnil
.并行: 您必须使用
WithContext
function 创建组并使用上下文来管理上下文关闭。 I created a playground example here with some sleeps! 您必须手动关闭每个 goroutine,但是使用上下文,您可以在关闭上下文时结束它们。管道(在 examples 中查看更多内容)。