嵌套的 errgroup 在一堆 goroutines 中
Nested errgroup inside bunch of goroutines
我对 golang 及其并发原则还很陌生。我的用例涉及对一批实体执行多个 http 请求(针对单个实体)。如果某个实体的任何 http 请求失败,我需要停止对它的所有并行 http 请求。此外,我必须管理因错误而失败的实体数。我正在尝试在实体 goroutine 中实现 errorgroup,这样如果单个实体的任何 http 请求失败,errorgroup 就会终止,并向其父 goroutine 发送 return 错误。但我不确定如何维护错误计数。
func main(entity[] string) {
errorC := make(chan string) // channel to insert failed entity
var wg sync.WaitGroup
for _, link := range entity {
wg.Add(1)
// Spawn errorgroup here. errorgroup_spawn
}
go func() {
wg.Wait()
close(errorC)
}()
for msg := range errorC {
// here storing error entityIds somewhere.
}
}
和这样的错误组
func errorgroup_spawn(ctx context.Context, errorC chan string, wg *sync.WaitGroup) { // and other params
defer (*wg).Done()
goRoutineCollection, ctxx := errgroup.WithContext(ctx)
results := make(chan *result)
goRoutineCollection.Go(func() error {
// http calls for single entity
// if error occurs, push it in errorC, and return Error.
return nil
})
go func() {
goRoutineCollection.Wait()
close(result)
}()
return goRoutineCollection.Wait()
}
PS: 我也在考虑应用嵌套错误组,但想不出维护错误计数,而 运行 其他错误组
谁能指导我,这是处理此类现实世界场景的正确方法吗?
跟踪错误的一种方法是使用状态结构来跟踪哪个错误来自哪里:
type Status struct {
Entity string
Err error
}
...
errorC := make(chan Status)
// Spawn error groups with name of the entity, and when error happens, push Status{Entity:entityName,Err:err} to the chanel
然后您可以从错误通道中读取所有错误并找出失败的原因。
另一种选择是根本不使用错误组。这使事情更加明确,但它是否更好是值得商榷的:
// Keep entity statuses
statuses:=make([]Status,len(entity))
for i, link := range entity {
statuses[i].Entity=link
wg.Add(1)
go func(i index) {
defer wg.Done()
ctx, cancel:=context.WithCancel(context.Background())
defer cancel()
// Error collector
status:=make(chan error)
defer close(status)
go func() {
for st:=range status {
if st!=nil {
cancel() // Stop all calls
// store first error
if statuses[i].Err==nil {
statuses[i].Err=st
}
}
}
}()
innerWg:=sync.WaitGroup{}
innerWg.Add(1)
go func() {
defer innerWg.Done()
status<- makeHttpCall(ctx)
}()
innerWg.Add(1)
go func() {
defer innerWg.Done()
status<- makeHttpCall(ctx)
}()
...
innerWg.Wait()
}(i)
}
当一切都完成后,statuses
将包含所有实体和相应的状态。
我对 golang 及其并发原则还很陌生。我的用例涉及对一批实体执行多个 http 请求(针对单个实体)。如果某个实体的任何 http 请求失败,我需要停止对它的所有并行 http 请求。此外,我必须管理因错误而失败的实体数。我正在尝试在实体 goroutine 中实现 errorgroup,这样如果单个实体的任何 http 请求失败,errorgroup 就会终止,并向其父 goroutine 发送 return 错误。但我不确定如何维护错误计数。
func main(entity[] string) {
errorC := make(chan string) // channel to insert failed entity
var wg sync.WaitGroup
for _, link := range entity {
wg.Add(1)
// Spawn errorgroup here. errorgroup_spawn
}
go func() {
wg.Wait()
close(errorC)
}()
for msg := range errorC {
// here storing error entityIds somewhere.
}
}
和这样的错误组
func errorgroup_spawn(ctx context.Context, errorC chan string, wg *sync.WaitGroup) { // and other params
defer (*wg).Done()
goRoutineCollection, ctxx := errgroup.WithContext(ctx)
results := make(chan *result)
goRoutineCollection.Go(func() error {
// http calls for single entity
// if error occurs, push it in errorC, and return Error.
return nil
})
go func() {
goRoutineCollection.Wait()
close(result)
}()
return goRoutineCollection.Wait()
}
PS: 我也在考虑应用嵌套错误组,但想不出维护错误计数,而 运行 其他错误组 谁能指导我,这是处理此类现实世界场景的正确方法吗?
跟踪错误的一种方法是使用状态结构来跟踪哪个错误来自哪里:
type Status struct {
Entity string
Err error
}
...
errorC := make(chan Status)
// Spawn error groups with name of the entity, and when error happens, push Status{Entity:entityName,Err:err} to the chanel
然后您可以从错误通道中读取所有错误并找出失败的原因。
另一种选择是根本不使用错误组。这使事情更加明确,但它是否更好是值得商榷的:
// Keep entity statuses
statuses:=make([]Status,len(entity))
for i, link := range entity {
statuses[i].Entity=link
wg.Add(1)
go func(i index) {
defer wg.Done()
ctx, cancel:=context.WithCancel(context.Background())
defer cancel()
// Error collector
status:=make(chan error)
defer close(status)
go func() {
for st:=range status {
if st!=nil {
cancel() // Stop all calls
// store first error
if statuses[i].Err==nil {
statuses[i].Err=st
}
}
}
}()
innerWg:=sync.WaitGroup{}
innerWg.Add(1)
go func() {
defer innerWg.Done()
status<- makeHttpCall(ctx)
}()
innerWg.Add(1)
go func() {
defer innerWg.Done()
status<- makeHttpCall(ctx)
}()
...
innerWg.Wait()
}(i)
}
当一切都完成后,statuses
将包含所有实体和相应的状态。