Golang for-select 爆炸 CPU

Golang for-select blows up CPU

我有一个 grpc 基准测试代码,它使用一个函数使用 for-select 子句将数百个 goroutine 通道合并到一个通道。代码是这样的

     func (b *B) merge(
          ctx context.Context,
          nodes ...<-chan *pb.Node,
        ) chan *pb.Node {
    allNodes := make(chan *pb.Node)
    var wg sync.WaitGroup
    wg.Add(len(nodes))
    for _, n := range nodes {
        go func(n <-chan *pb.Node) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-n:
                    if ok {
                        allNodes <- val
                    }
                }
            }
        }(n)
    }
    go func() {
        wg.Wait()
        close(allNodes)
    }()
    return allNodes
}

当我在 ubuntu 16.04 中通过 top 命令监视代码时,我看到 2 核服务器疯狂旋转,超过 cpu 使用率的 196%。

然后我使用 pprof 包分析我的代码,它说我的 98% cpu 自旋这个函数,而 top 函数生成这样的结果

    flat  flat%   sum%        cum   cum%
   1640ms  5.78%  5.78%    27700ms 97.60%  B (*B).merge.func1
    5560ms 19.59% 25.37%    22130ms 77.98%  runtime.selectgo
     770ms  2.71% 28.08%    11190ms 39.43%  runtime.sellock
    2700ms  9.51% 37.60%    10430ms 36.75%  runtime.lock
    7710ms 27.17% 64.76%     7710ms 27.17%  runtime.procyield
     460ms  1.62% 66.38%     3850ms 13.57%  context.(*cancelCtx).Done
    1210ms  4.26% 70.65%     3350ms 11.80%  runtime.selunlock
    2700ms  9.51% 80.16%     2900ms 10.22%  sync.(*Mutex).Lock
    2110ms  7.43% 87.60%     2140ms  7.54%  runtime.unlock
     360ms  1.27% 88.87%      860ms  3.03%  runtime.typedmemclr

任何人都可以给我一些关于如何编写正确的代码来合并大量通道的建议似乎这个 for-select 块只是让 cpu 变得疯狂,在它后面使用 procyield哪个机制不是很有前途?

有没有办法控制进程的cpu使用?

似乎很可能是 nodes 参数中传递的通道在上下文被取消之前被关闭了。这会将您的 for 循环变成一个紧密循环,它会消耗所有可用的 CPU。由于通道一旦关闭就无法重新打开,一旦 ok 为假,您可以从 goroutine 安全地 return,这应该可以解决该问题:

    go func(n <-chan *pb.Node) {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case val, ok := <-n:
                if !ok {
                    return
                }
                allNodes <- val
            }
        }
    }(n)

关闭的通道不会阻塞 - 请参阅 https://dave.cheney.net/2013/04/30/curious-channels

关闭后将您的频道设置为零。

  case val, ok := <-n:
    if ok {
      allNodes <- val
    } else {
      n = nil
    }

然后 select 将只阻塞等待完成消息。