Goroutines 节流示例

Goroutines throttle example

我正在通过 Udemy 课程学习基础围棋。在 goroutines 部分,有一个节流的例子,它让我理解了等待组是如何工作的。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    c1 := make(chan int)
    c2 := make(chan int)

    go populate(c1)

    go fanOutIn(c1, c2)

    for v := range c2 {
        fmt.Println(v)
    }

    fmt.Println("about to exit")
}

func populate(c chan int) {
    for i := 0; i < 100; i++ {
        c <- i
    }
    close(c)
}

func fanOutIn(c1, c2 chan int) {
    var wg sync.WaitGroup
    const goroutines = 10
    wg.Add(goroutines) 
    for i := 0; i < goroutines; i++ {
        go func() {
            for v := range c1 {
                func(v2 int) {
                    c2 <- timeConsumingWork(v2)
                }(v)
            }
            wg.Done()
        }()
    }
    wg.Wait()
    close(c2)
}

func timeConsumingWork(n int) int {
    time.Sleep(time.Microsecond * time.Duration(rand.Intn(500)))
    return n + rand.Intn(1000)
}

与我的理解不符的部分在函数 fanOutIn 中,我们在其中设置了 WaitGroupAdd(10).

为什么我打印出 100 个值?只能将单个值 (i := 0) 放入 c1,并且该值永远不会明确地从通道中删除。然后代码命中 wg.Done(),等待组队列减少到 9 等等。

根据我目前的理解,我希望看到 0 + rand.Intn(1000) 的 10 个值。

拆分出来的函数如下(包括前面的go和调用它的括号):

go func() {
    for v := range c1 {
        func(v2 int) {
            c2 <- timeConsumingWork(v2)
        }(v)
    }
    wg.Done()
}()

这段代码有点怪异。让我们进一步缩小它,丢弃 wg.Done 并只保留 for 循环本身:

for v := range c1 {
    func(v2 int) {
        c2 <- timeConsumingWork(v2)
    }(v)
}

有一个内部未命名函数在这里毫无用处;我们可以在不改变程序行为的情况下丢弃它,得到:

for v := range c1 {
    c2 <- timeConsumingWork(v)
}

这终于是一个简单的循环。现在的一个关键问题是:您希望从这个循环中进行多少次迭代? 注意:它不一定是任何 常数 数字。也许更好的表达问题的方式是:这个循环什么时候结束?

for循环读取一个频道。当从通道读取表明没有更多数据,即通道关闭且其队列为空时,这种循环结束。 (参见 the Go specification section on for loops。)

所以这个最内层的循环 for v := range c1 不会终止,直到通道 c1 关闭并且其队列中没有更多数据。此频道创建于:

c1 := make(chan int)

所以它没有队列,所以我们甚至不需要考虑:它在 close(c1) 关闭后终止。 您现在应该寻找关闭 c1close

我们的收盘在哪里?

这里是关门的地方c1:

func populate(c chan int) {
    for i := 0; i < 100; i++ {
        c <- i
    }
    close(c)
}

我们用 c1 作为它的参数来调用它,所以它的最终 close(c) 关闭 c1。现在你可以问:我们什么时候到达这个 close 调用? 答案很明显:在循环中的 i >= 100 之后,即在我们发送了 100 个值之后, 零到 99 分别进入通道 c1.

fanOutIn 所做的是分拆 10 个 goroutines。 10 个 goroutine 运行 中的每一个都是我在上面引用的第一个匿名函数。该匿名函数有一个循环 运行 不确定次数,重复直到通道 c1 关闭。循环中的每一次旅行都会获取通道的一个值,因此最初,如果 10 个 goroutine 都设法在有任何可用值之前启动,则所有 10 个 goroutine 都将等待值。

当生产者函数将一个值放入通道时,十个等待的 goroutines 中的 one 将获取并开始使用它。如果那个 goroutine 需要很长时间才能回到它自己的 for 循环的顶部,另一个 goroutine 将获取下一个产生的值。所以这里发生的是最多十个 produced 值通过通道传播到最多十个 goroutines。1 每个(最多十个) goroutines 花费一些不平凡的时间来使用它的值,然后将最终产品值发送到通道 c2 并返回到它自己的无限期 for 循环的顶部。

只有当生产者关闭它的通道c(这是我们这里的c1)时,十个goroutines才会看到一个closed-channel-empty -queue,允许他们退出 for 循环。当他们确实退出他们的 for 循环时,他们每个人都会调用 wg.Done() (每个一次)并终止。

因此,一旦 close(c1) 发生(通过 populate 中的 close(c)),最终 所有十个匿名 goroutines 将已致电 wg.Done()。届时,fanOutIn 中的 wg.Wait() 将 return。这将从 fanOutIn 调用 close(c2) 和 return,同时终止 that goroutine。

同时,在 main 中,我们使用 for v := range c2 从通道 c2 读取数据。当十个 goroutine 中的任何一个将值写入 c2 时,这个 for 循环将 运行。它只会在 c2 本身关闭时退出(它的队列也必须为空,但同样 c2 有一个零长度队列)。所以 main 不会继续 过去 for 循环直到 c2 关闭,这不会发生直到 wg.Wait() return s,直到发生十次 wg.Done() 调用才会发生,直到通道 c1 关闭才会发生。

这意味着 mainpopulate 调用 close(c) 之前无法通过它自己的 for 循环,并且只有在恰好生成 100 个值之后才会发生这种情况。


1正如所讨论的 ,这里的短语 up to 可能很重要:我们真的不知道如何许多 goroutines 真的会消耗值。很大程度上取决于每个 goroutine 的工作量、工作类型以及您的 Go 运行time 可用的 CPU 数量。