在 Goroutine 中延迟调用 sync.WaitGroup.Wait():为什么要这样做?
Deferred call to sync.WaitGroup.Wait() in Goroutine: why should this work?
我正在尝试理解 vegeta
负载测试 tool/library 的源代码中的 Attack()
函数 (https://github.com/tsenart/vegeta/blob/44a49c878dd6f28f04b9b5ce5751490b0dce1e18/lib/attack.go#L253-L312)。我创建了一个简化示例:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go attack(&wg)
}
// wg.Wait()
go func() {
defer wg.Wait()
}()
}
func attack(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(1 * time.Second)
fmt.Println("foobar")
}
我注意到这个函数立即 returns 而没有打印 foobar
10 次。只有在 wg.Wait()
行中的评论,我才看到 foobar
在 1 秒后打印了 10 次。这对我来说很有意义,因为调用 wg.Wait()
之前的 main()
函数 returns。
那么,我不明白 Attack()
方法在 vegeta
中是如何工作的,因为它似乎遵循类似的模式:
func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) <-chan *Result {
var wg sync.WaitGroup
workers := a.workers
if workers > a.maxWorkers {
workers = a.maxWorkers
}
results := make(chan *Result)
ticks := make(chan struct{})
for i := uint64(0); i < workers; i++ {
wg.Add(1)
go a.attack(tr, name, &wg, ticks, results)
}
go func() {
defer close(results)
defer wg.Wait()
defer close(ticks)
began, count := time.Now(), uint64(0)
for {
elapsed := time.Since(began)
if du > 0 && elapsed > du {
return
}
wait, stop := p.Pace(elapsed, count)
if stop {
return
}
time.Sleep(wait)
if workers < a.maxWorkers {
select {
case ticks <- struct{}{}:
count++
continue
case <-a.stopch:
return
default:
// all workers are blocked. start one more and try again
workers++
wg.Add(1)
go a.attack(tr, name, &wg, ticks, results)
}
}
select {
case ticks <- struct{}{}:
count++
case <-a.stopch:
return
}
}
}()
return results
}
其中 attack()
方法读取
func (a *Attacker) attack(tr Targeter, name string, workers *sync.WaitGroup, ticks <-chan struct{}, results chan<- *Result) {
defer workers.Done()
for range ticks {
results <- a.hit(tr, name)
}
}
我不明白为什么 Attack()
函数在不调用 attack()
的情况下不会立即 return,因为它的 wg.Wait()
在 Goroutine 中?
vegeta 的 Attack
也立即 returns,但有一个通道,该通道由剩余的 goroutines 填充 运行。
一旦这些完成,通道将关闭 (defer close(results)
) 启用具有 result
的代码来检测完成。
示例;
package main
import (
"fmt"
"sync"
"time"
)
func main() {
results := attacks()
fmt.Println("attacks returned")
for result := range results {
fmt.Println(result)
}
}
func attacks() chan string {
// A channel to hold the results
c := make(chan string)
// Fire 10 routines populating the channel
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
attack(c)
wg.Done()
}()
}
// Close channel once routines are finished
go func() {
wg.Wait()
close(c)
}()
//
return c
}
func attack(c chan<- string) {
time.Sleep(1 * time.Second)
c <- "foobar"
}
我正在尝试理解 vegeta
负载测试 tool/library 的源代码中的 Attack()
函数 (https://github.com/tsenart/vegeta/blob/44a49c878dd6f28f04b9b5ce5751490b0dce1e18/lib/attack.go#L253-L312)。我创建了一个简化示例:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go attack(&wg)
}
// wg.Wait()
go func() {
defer wg.Wait()
}()
}
func attack(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(1 * time.Second)
fmt.Println("foobar")
}
我注意到这个函数立即 returns 而没有打印 foobar
10 次。只有在 wg.Wait()
行中的评论,我才看到 foobar
在 1 秒后打印了 10 次。这对我来说很有意义,因为调用 wg.Wait()
之前的 main()
函数 returns。
那么,我不明白 Attack()
方法在 vegeta
中是如何工作的,因为它似乎遵循类似的模式:
func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) <-chan *Result {
var wg sync.WaitGroup
workers := a.workers
if workers > a.maxWorkers {
workers = a.maxWorkers
}
results := make(chan *Result)
ticks := make(chan struct{})
for i := uint64(0); i < workers; i++ {
wg.Add(1)
go a.attack(tr, name, &wg, ticks, results)
}
go func() {
defer close(results)
defer wg.Wait()
defer close(ticks)
began, count := time.Now(), uint64(0)
for {
elapsed := time.Since(began)
if du > 0 && elapsed > du {
return
}
wait, stop := p.Pace(elapsed, count)
if stop {
return
}
time.Sleep(wait)
if workers < a.maxWorkers {
select {
case ticks <- struct{}{}:
count++
continue
case <-a.stopch:
return
default:
// all workers are blocked. start one more and try again
workers++
wg.Add(1)
go a.attack(tr, name, &wg, ticks, results)
}
}
select {
case ticks <- struct{}{}:
count++
case <-a.stopch:
return
}
}
}()
return results
}
其中 attack()
方法读取
func (a *Attacker) attack(tr Targeter, name string, workers *sync.WaitGroup, ticks <-chan struct{}, results chan<- *Result) {
defer workers.Done()
for range ticks {
results <- a.hit(tr, name)
}
}
我不明白为什么 Attack()
函数在不调用 attack()
的情况下不会立即 return,因为它的 wg.Wait()
在 Goroutine 中?
vegeta 的 Attack
也立即 returns,但有一个通道,该通道由剩余的 goroutines 填充 运行。
一旦这些完成,通道将关闭 (defer close(results)
) 启用具有 result
的代码来检测完成。
示例;
package main
import (
"fmt"
"sync"
"time"
)
func main() {
results := attacks()
fmt.Println("attacks returned")
for result := range results {
fmt.Println(result)
}
}
func attacks() chan string {
// A channel to hold the results
c := make(chan string)
// Fire 10 routines populating the channel
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
attack(c)
wg.Done()
}()
}
// Close channel once routines are finished
go func() {
wg.Wait()
close(c)
}()
//
return c
}
func attack(c chan<- string) {
time.Sleep(1 * time.Second)
c <- "foobar"
}