嵌套的 errgroup 在一堆 goroutines 中

Nested errgroup inside bunch of goroutines

我对 golang 及其并发原则还很陌生。我的用例涉及对一批实体执行多个 http 请求(针对单个实体)。如果某个实体的任何 http 请求失败,我需要停止对它的所有并行 http 请求。此外,我必须管理因错误而失败的实体数。我正在尝试在实体 goroutine 中实现 errorgroup,这样如果单个实体的任何 http 请求失败,errorgroup 就会终止,并向其父 goroutine 发送 return 错误。但我不确定如何维护错误计数。

func main(entity[] string) {
    errorC := make(chan string) // channel to insert failed entity
    var wg sync.WaitGroup

    for _, link := range entity {
        wg.Add(1)
        // Spawn errorgroup here. errorgroup_spawn
    }

    go func() {
        wg.Wait()   
        close(errorC)    
    }()

    for msg := range errorC {
        // here storing error entityIds somewhere.
    }
}

和这样的错误组

func errorgroup_spawn(ctx context.Context, errorC chan string, wg *sync.WaitGroup) { // and other params
    defer (*wg).Done()
    
   goRoutineCollection, ctxx := errgroup.WithContext(ctx)
    results := make(chan *result)
    goRoutineCollection.Go(func() error {
        // http calls for single entity
        // if error occurs, push it in errorC, and return Error.
        return nil
    })

    go func() {
        goRoutineCollection.Wait()
        close(result)
    }()

   return goRoutineCollection.Wait()
}

PS: 我也在考虑应用嵌套错误组,但想不出维护错误计数,而 运行 其他错误组 谁能指导我,这是处理此类现实世界场景的正确方法吗?

跟踪错误的一种方法是使用状态结构来跟踪哪个错误来自哪里:

type Status struct {
   Entity string
   Err error
}
...

errorC := make(chan Status) 

// Spawn error groups with name of the entity, and when error happens, push Status{Entity:entityName,Err:err} to the chanel

然后您可以从错误通道中读取所有错误并找出失败的原因。

另一种选择是根本不使用错误组。这使事情更加明确,但它是否更好是值得商榷的:

// Keep entity statuses
statuses:=make([]Status,len(entity))
for i, link := range entity {
   statuses[i].Entity=link
   wg.Add(1)
   go func(i index) {
      defer wg.Done()
      ctx, cancel:=context.WithCancel(context.Background())
      defer cancel()

      // Error collector
      status:=make(chan error)
      defer close(status)
      go func() {
         for st:=range status {
             if st!=nil {
                cancel()  // Stop all calls 
                // store first error
                if statuses[i].Err==nil {
                   statuses[i].Err=st
                }
             }
         }
      }()

      innerWg:=sync.WaitGroup{}
      innerWg.Add(1)
      go func() {
         defer innerWg.Done()
         status<- makeHttpCall(ctx)
      }()
      innerWg.Add(1)
      go func() {
         defer innerWg.Done()
         status<- makeHttpCall(ctx)
      }()
      ...
      innerWg.Wait()

   }(i)
}

当一切都完成后,statuses将包含所有实体和相应的状态。