去例行的生产者消费者模式恐慌
go routine producer-consumer pattern panics
我已经实现了 中提到的 goroutine 的生产者-消费者模式。但它有时会出现恐慌并显示错误消息:"panic: sync: negative WaitGroup counter"。我有如下示例代码:
package main
import (
"bytes"
"encoding/gob"
"log"
_ "net/http/pprof"
"sync"
)
// Test ...
type Test struct {
PropA []int
PropB []int
}
// Clone deep-copies a to b
func Clone(a, b interface{}) {
buff := new(bytes.Buffer)
enc := gob.NewEncoder(buff)
dec := gob.NewDecoder(buff)
enc.Encode(a)
dec.Decode(b)
}
func main() {
test := Test{
PropA: []int{211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222},
PropB: []int{111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124},
}
var wg, wg2 sync.WaitGroup
ch := make(chan int, 5)
results := make(chan Test, 5)
// start consumers
for i := 0; i < 4; i++ {
wg.Add(1)
go func(ch <-chan int, results chan<- Test) {
defer wg.Done()
for propA := range ch {
var temp Test
Clone(&test, &temp)
temp.PropA = []int{propA}
results <- temp
}
}(ch, results)
}
// start producing
go func(ch chan<- int) {
defer wg.Done()
for _, propA := range test.PropA {
ch <- propA
}
close(ch)
}(ch)
wg2.Add(1)
go func(results <-chan Test) {
defer wg2.Done()
for tt := range results {
log.Printf("finished propA %+v\n", tt.PropA[0])
}
}(results)
wg.Wait() // Wait all consumers to finish processing jobs
// All jobs are processed, no more values will be sent on results:
close(results)
wg2.Wait()
}
当我 运行 以上代码 4-5 次时,它至少会出现一次恐慌。有时,错误消息是 "panic: send on closed channel"。我不明白在生产者完成发送之前通道是如何关闭的,以及为什么 Waitgroup 计数器达到负值。有人可以给我解释一下吗?
编辑
恐慌的堆栈跟踪如下:(以上代码的文件名是 mycode.go
)
panic: send on closed channel
panic: sync: negative WaitGroup counter
goroutine 21 [running]:
sync.(*WaitGroup).Add(0xc420134020, 0xffffffffffffffff)
/usr/local/go/src/sync/waitgroup.go:75 +0x134
sync.(*WaitGroup).Done(0xc420134020)
/usr/local/go/src/sync/waitgroup.go:100 +0x34
panic(0x7622e0, 0x80ffa0)
/usr/local/go/src/runtime/panic.go:491 +0x283
main.main.func1(0xc420134020, 0xc420136090, 0xc420148000, 0xc42014a000)
/home/mycode.go:45 +0x80
created by main.main
/home/mycode.go:39 +0x21d
exit status 2
所以您似乎有两点 wg.Done() 可能是问题所在。
我重构了您的代码以仅放置一点 wg.Done() 并且从您放置的循环中删除了 wg.Add(1) 。
代码 here
您在 wg
上的簿记差了一个,因为您的制作人调用了 wg.Done()
,但没有调用 Add()
来解释它。恐慌与 go 调度程序的可变性有关,但是一旦你看到修复,我相信你会看到如何根据时间获得 "negative WaitGroup counter" 和/或 "send on closed channel"。
修复很简单,只需在启动生产者之前添加 wg.Add()
。
...
wg.Add(1)
// start producing
go func(ch chan<- int) {
defer wg.Done()
for _, propA := range test.PropA {
ch <- propA
}
close(ch)
}(ch)
...
将来,当您看到 "negative WaitGroup counter" 时,可以保证您没有将 1:1 与 Add
的数量匹配到 Done
。
我已经实现了
package main
import (
"bytes"
"encoding/gob"
"log"
_ "net/http/pprof"
"sync"
)
// Test ...
type Test struct {
PropA []int
PropB []int
}
// Clone deep-copies a to b
func Clone(a, b interface{}) {
buff := new(bytes.Buffer)
enc := gob.NewEncoder(buff)
dec := gob.NewDecoder(buff)
enc.Encode(a)
dec.Decode(b)
}
func main() {
test := Test{
PropA: []int{211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222},
PropB: []int{111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124},
}
var wg, wg2 sync.WaitGroup
ch := make(chan int, 5)
results := make(chan Test, 5)
// start consumers
for i := 0; i < 4; i++ {
wg.Add(1)
go func(ch <-chan int, results chan<- Test) {
defer wg.Done()
for propA := range ch {
var temp Test
Clone(&test, &temp)
temp.PropA = []int{propA}
results <- temp
}
}(ch, results)
}
// start producing
go func(ch chan<- int) {
defer wg.Done()
for _, propA := range test.PropA {
ch <- propA
}
close(ch)
}(ch)
wg2.Add(1)
go func(results <-chan Test) {
defer wg2.Done()
for tt := range results {
log.Printf("finished propA %+v\n", tt.PropA[0])
}
}(results)
wg.Wait() // Wait all consumers to finish processing jobs
// All jobs are processed, no more values will be sent on results:
close(results)
wg2.Wait()
}
当我 运行 以上代码 4-5 次时,它至少会出现一次恐慌。有时,错误消息是 "panic: send on closed channel"。我不明白在生产者完成发送之前通道是如何关闭的,以及为什么 Waitgroup 计数器达到负值。有人可以给我解释一下吗?
编辑
恐慌的堆栈跟踪如下:(以上代码的文件名是 mycode.go
)
panic: send on closed channel
panic: sync: negative WaitGroup counter
goroutine 21 [running]:
sync.(*WaitGroup).Add(0xc420134020, 0xffffffffffffffff)
/usr/local/go/src/sync/waitgroup.go:75 +0x134
sync.(*WaitGroup).Done(0xc420134020)
/usr/local/go/src/sync/waitgroup.go:100 +0x34
panic(0x7622e0, 0x80ffa0)
/usr/local/go/src/runtime/panic.go:491 +0x283
main.main.func1(0xc420134020, 0xc420136090, 0xc420148000, 0xc42014a000)
/home/mycode.go:45 +0x80
created by main.main
/home/mycode.go:39 +0x21d
exit status 2
所以您似乎有两点 wg.Done() 可能是问题所在。 我重构了您的代码以仅放置一点 wg.Done() 并且从您放置的循环中删除了 wg.Add(1) 。 代码 here
您在 wg
上的簿记差了一个,因为您的制作人调用了 wg.Done()
,但没有调用 Add()
来解释它。恐慌与 go 调度程序的可变性有关,但是一旦你看到修复,我相信你会看到如何根据时间获得 "negative WaitGroup counter" 和/或 "send on closed channel"。
修复很简单,只需在启动生产者之前添加 wg.Add()
。
...
wg.Add(1)
// start producing
go func(ch chan<- int) {
defer wg.Done()
for _, propA := range test.PropA {
ch <- propA
}
close(ch)
}(ch)
...
将来,当您看到 "negative WaitGroup counter" 时,可以保证您没有将 1:1 与 Add
的数量匹配到 Done
。