goroutines 管道中的死锁
Deadlock in goroutines pipeline
我需要你的帮助来理解为什么我的 readFromWorker
函数会导致死锁。当我注释掉如下所示的行时,它可以正常工作(因此我知道问题出在这里)。
全在这里https://play.golang.org/p/-0mRDAeD2tr
非常感谢您的帮助
func readFromWorker(inCh <-chan *data, wg *sync.WaitGroup) {
defer func() {
wg.Done()
}()
//stageIn1 := make(chan *data)
//stageOut1 := make(chan *data)
for v := range inCh {
fmt.Println("v", v)
//stageIn1 <- v
}
//go stage1(stageIn1, stageOut1)
//go stage2(stageOut1)
}
我已经评论了你做错的相关部分。另外,我建议考虑一个更好的模式。
请记住,频道上的 for range
不会停止循环,除非为正在循环的同一频道调用 close
。此外,关闭通道的经验法则是发送到通道的发送者也必须关闭它,因为发送到关闭的通道会导致 panic
.
此外,在使用无缓冲和缓冲通道时要非常小心。对于无缓冲通道,发送方和接收方必须准备就绪,否则在您的情况下也会发生死锁。
package main
import (
"fmt"
"sync"
)
type data struct {
id int
url string
field int
}
type job struct {
id int
url string
}
func sendToWorker(id int, inCh <-chan job, outCh chan<- *data, wg *sync.WaitGroup) {
// wg.Done() is itself a function call, no need to wrap it inside
// an anonymous function just to use defer.
defer wg.Done()
for v := range inCh {
// some pre process stuff and then pass to pipeline
outCh <- &data{id: v.id, url: v.url}
}
}
func readFromWorker(inCh <-chan *data, wg *sync.WaitGroup) {
// wg.Done() is itself a function call, no need to wrap it inside
// an anonymous function just to use defer.
defer wg.Done()
var (
stageIn1 = make(chan *data)
stageOut1 = make(chan *data)
)
// Spawn the goroutines so that there's no deadlock
// as the sender and receiver both should be ready
// when using unbuffered channels.
go stage1(stageIn1, stageOut1)
go stage2(stageOut1)
for v := range inCh {
fmt.Println("v", v)
stageIn1 <- v
}
close(stageIn1)
}
func stage1(in <-chan *data, out chan<- *data) {
for s := range in {
fmt.Println("stage1 = ", s)
out <- s
}
// Close the out channel
close(out)
}
func stage2(out <-chan *data) {
// Loop until close
for s := range out {
fmt.Println("stage2 = ", s)
}
}
func main() {
const chanBuffer = 1
var (
inputsCh = make(chan job, chanBuffer)
resultsCh = make(chan *data, chanBuffer)
wgInput sync.WaitGroup
wgResult sync.WaitGroup
)
for i := 1; i <= 4; i++ {
wgInput.Add(1)
go sendToWorker(i, inputsCh, resultsCh, &wgInput)
}
wgResult.Add(1)
go readFromWorker(resultsCh, &wgResult)
for j := 1; j <= 10; j++ {
inputsCh <- job{id: j, url: "google.com"}
}
close(inputsCh)
wgInput.Wait()
close(resultsCh)
wgResult.Wait()
}
我需要你的帮助来理解为什么我的 readFromWorker
函数会导致死锁。当我注释掉如下所示的行时,它可以正常工作(因此我知道问题出在这里)。
全在这里https://play.golang.org/p/-0mRDAeD2tr
非常感谢您的帮助
func readFromWorker(inCh <-chan *data, wg *sync.WaitGroup) {
defer func() {
wg.Done()
}()
//stageIn1 := make(chan *data)
//stageOut1 := make(chan *data)
for v := range inCh {
fmt.Println("v", v)
//stageIn1 <- v
}
//go stage1(stageIn1, stageOut1)
//go stage2(stageOut1)
}
我已经评论了你做错的相关部分。另外,我建议考虑一个更好的模式。
请记住,频道上的 for range
不会停止循环,除非为正在循环的同一频道调用 close
。此外,关闭通道的经验法则是发送到通道的发送者也必须关闭它,因为发送到关闭的通道会导致 panic
.
此外,在使用无缓冲和缓冲通道时要非常小心。对于无缓冲通道,发送方和接收方必须准备就绪,否则在您的情况下也会发生死锁。
package main
import (
"fmt"
"sync"
)
type data struct {
id int
url string
field int
}
type job struct {
id int
url string
}
func sendToWorker(id int, inCh <-chan job, outCh chan<- *data, wg *sync.WaitGroup) {
// wg.Done() is itself a function call, no need to wrap it inside
// an anonymous function just to use defer.
defer wg.Done()
for v := range inCh {
// some pre process stuff and then pass to pipeline
outCh <- &data{id: v.id, url: v.url}
}
}
func readFromWorker(inCh <-chan *data, wg *sync.WaitGroup) {
// wg.Done() is itself a function call, no need to wrap it inside
// an anonymous function just to use defer.
defer wg.Done()
var (
stageIn1 = make(chan *data)
stageOut1 = make(chan *data)
)
// Spawn the goroutines so that there's no deadlock
// as the sender and receiver both should be ready
// when using unbuffered channels.
go stage1(stageIn1, stageOut1)
go stage2(stageOut1)
for v := range inCh {
fmt.Println("v", v)
stageIn1 <- v
}
close(stageIn1)
}
func stage1(in <-chan *data, out chan<- *data) {
for s := range in {
fmt.Println("stage1 = ", s)
out <- s
}
// Close the out channel
close(out)
}
func stage2(out <-chan *data) {
// Loop until close
for s := range out {
fmt.Println("stage2 = ", s)
}
}
func main() {
const chanBuffer = 1
var (
inputsCh = make(chan job, chanBuffer)
resultsCh = make(chan *data, chanBuffer)
wgInput sync.WaitGroup
wgResult sync.WaitGroup
)
for i := 1; i <= 4; i++ {
wgInput.Add(1)
go sendToWorker(i, inputsCh, resultsCh, &wgInput)
}
wgResult.Add(1)
go readFromWorker(resultsCh, &wgResult)
for j := 1; j <= 10; j++ {
inputsCh <- job{id: j, url: "google.com"}
}
close(inputsCh)
wgInput.Wait()
close(resultsCh)
wgResult.Wait()
}