在 Goroutine 中延迟调用 sync.WaitGroup.Wait():为什么要这样做?

Deferred call to sync.WaitGroup.Wait() in Goroutine: why should this work?

我正在尝试理解 vegeta 负载测试 tool/library 的源代码中的 Attack() 函数 (https://github.com/tsenart/vegeta/blob/44a49c878dd6f28f04b9b5ce5751490b0dce1e18/lib/attack.go#L253-L312)。我创建了一个简化示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go attack(&wg)
    }
    // wg.Wait()

    go func() {
        defer wg.Wait()
    }()
}

func attack(wg *sync.WaitGroup) {
    defer wg.Done()
    time.Sleep(1 * time.Second)
    fmt.Println("foobar")
}

我注意到这个函数立即 returns 而没有打印 foobar 10 次。只有在 wg.Wait() 行中的评论,我才看到 foobar 在 1 秒后打印了 10 次。这对我来说很有意义,因为调用 wg.Wait() 之前的 main() 函数 returns。

那么,我不明白 Attack() 方法在 vegeta 中是如何工作的,因为它似乎遵循类似的模式:

func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) <-chan *Result {
    var wg sync.WaitGroup

    workers := a.workers
    if workers > a.maxWorkers {
        workers = a.maxWorkers
    }

    results := make(chan *Result)
    ticks := make(chan struct{})
    for i := uint64(0); i < workers; i++ {
        wg.Add(1)
        go a.attack(tr, name, &wg, ticks, results)
    }

    go func() {
        defer close(results)
        defer wg.Wait()
        defer close(ticks)

        began, count := time.Now(), uint64(0)
        for {
            elapsed := time.Since(began)
            if du > 0 && elapsed > du {
                return
            }

            wait, stop := p.Pace(elapsed, count)
            if stop {
                return
            }

            time.Sleep(wait)

            if workers < a.maxWorkers {
                select {
                case ticks <- struct{}{}:
                    count++
                    continue
                case <-a.stopch:
                    return
                default:
                    // all workers are blocked. start one more and try again
                    workers++
                    wg.Add(1)
                    go a.attack(tr, name, &wg, ticks, results)
                }
            }

            select {
            case ticks <- struct{}{}:
                count++
            case <-a.stopch:
                return
            }
        }
    }()

    return results
}

其中 attack() 方法读取

func (a *Attacker) attack(tr Targeter, name string, workers *sync.WaitGroup, ticks <-chan struct{}, results chan<- *Result) {
    defer workers.Done()
    for range ticks {
        results <- a.hit(tr, name)
    }
}

我不明白为什么 Attack() 函数在不调用 attack() 的情况下不会立即 return,因为它的 wg.Wait() 在 Goroutine 中?

vegeta 的 Attack 也立即 returns,但有一个通道,该通道由剩余的 goroutines 填充 运行。 一旦这些完成,通道将关闭 (defer close(results)) 启用具有 result 的代码来检测完成。

示例;

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    results := attacks()

    fmt.Println("attacks returned")

    for result := range results {
        fmt.Println(result)
    }
}

func attacks() chan string {
    // A channel to hold the results
    c := make(chan string)

    // Fire 10 routines populating the channel
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            attack(c)
            wg.Done()
        }()
    }

    // Close channel once routines are finished
    go func() {
        wg.Wait()
        close(c)
    }()

    //
    return c
}

func attack(c chan<- string) {
    time.Sleep(1 * time.Second)
    c <- "foobar"
}