goroutines 管道中的死锁

Deadlock in goroutines pipeline

我需要你的帮助来理解为什么我的 readFromWorker 函数会导致死锁。当我注释掉如下所示的行时,它可以正常工作(因此我知道问题出在这里)。

全在这里https://play.golang.org/p/-0mRDAeD2tr

非常感谢您的帮助

func readFromWorker(inCh <-chan *data, wg *sync.WaitGroup) {
    defer func() {
        wg.Done()
    }()

    //stageIn1 := make(chan *data)
    //stageOut1 := make(chan *data)

    for v := range inCh {
        fmt.Println("v", v)

        //stageIn1 <- v
    }

    //go stage1(stageIn1, stageOut1)
    //go stage2(stageOut1)
}

我已经评论了你做错的相关部分。另外,我建议考虑一个更好的模式。

请记住,频道上的 for range 不会停止循环,除非为正在循环的同一频道调用 close。此外,关闭通道的经验法则是发送到通道的发送者也必须关闭它,因为发送到关闭的通道会导致 panic.

此外,在使用无缓冲和缓冲通道时要非常小心。对于无缓冲通道,发送方和接收方必须准备就绪,否则在您的情况下也会发生死锁。

package main

import (
    "fmt"
    "sync"
)

type data struct {
    id    int
    url   string
    field int
}

type job struct {
    id  int
    url string
}

func sendToWorker(id int, inCh <-chan job, outCh chan<- *data, wg *sync.WaitGroup) {
    // wg.Done() is itself a function call, no need to wrap it inside
    // an anonymous function just to use defer.
    defer wg.Done()

    for v := range inCh {
        // some pre process stuff and then pass to pipeline
        outCh <- &data{id: v.id, url: v.url}
    }
}

func readFromWorker(inCh <-chan *data, wg *sync.WaitGroup) {
    // wg.Done() is itself a function call, no need to wrap it inside
    // an anonymous function just to use defer.
    defer wg.Done()

    var (
        stageIn1  = make(chan *data)
        stageOut1 = make(chan *data)
    )

    // Spawn the goroutines so that there's no deadlock
    // as the sender and receiver both should be ready
    // when using unbuffered channels.
    go stage1(stageIn1, stageOut1)
    go stage2(stageOut1)

    for v := range inCh {
        fmt.Println("v", v)
        stageIn1 <- v
    }
    close(stageIn1)
}

func stage1(in <-chan *data, out chan<- *data) {
    for s := range in {
        fmt.Println("stage1 = ", s)
        out <- s
    }
    // Close the out channel
    close(out)
}

func stage2(out <-chan *data) {
    // Loop until close
    for s := range out {
        fmt.Println("stage2 = ", s)
    }
}

func main() {
    const chanBuffer = 1

    var (
        inputsCh  = make(chan job, chanBuffer)
        resultsCh = make(chan *data, chanBuffer)

        wgInput  sync.WaitGroup
        wgResult sync.WaitGroup
    )

    for i := 1; i <= 4; i++ {
        wgInput.Add(1)
        go sendToWorker(i, inputsCh, resultsCh, &wgInput)
    }

    wgResult.Add(1)
    go readFromWorker(resultsCh, &wgResult)

    for j := 1; j <= 10; j++ {
        inputsCh <- job{id: j, url: "google.com"}
    }

    close(inputsCh)
    wgInput.Wait()
    close(resultsCh)
    wgResult.Wait()
}