工作池上的通道死锁

Channel deadlock on workerpool

我正在通过创建一个包含 1000 个工作人员的工作池来使用频道。目前我收到以下错误:

fatal error: all goroutines are asleep - deadlock!

这是我的代码:

package main

import "fmt"
import "time"


func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    for w := 1; w <= 1000; w++ {
        go worker(w, jobs, results)
    }

    for j := 1; j < 1000000; j++ {
        jobs <- j
    }
    close(jobs)
    fmt.Println("==========CLOSED==============")

    for i:=0;i<len(results);i++ {
        <-results
    }
}

为什么会这样?我还是个新手,我希望能理解这一点。

以下代码:

    for j := 1; j < 1000000; j++ {
        jobs <- j
    }

应该运行在一个单独的 goroutine 中,因为所有的 worker 都会阻塞等待主 gorourine 在结果通道上接收,而主 goroutine 卡在循环中。

问题是您的频道已满。 main() 例程在读取任何结果之前尝试将所有作业放入 jobs 通道。但是 results 通道只有 space 的 100 个结果,在任何写入通道之前都会阻塞,所以所有的工作人员最终都会阻塞等待这个通道中的 space – space那永远不会到来,因为 main() 还没有开始阅读 results

要快速解决此问题,您可以使 jobs 足够大以容纳所有作业,以便 main() 函数可以继续读取阶段;或者你可以使 results 足够大以容纳所有结果,这样工作人员就可以无阻塞地输出他们的结果。

更好的方法是创建另一个 goroutine 来填充 jobs 队列,这样 main() 可以直接读取结果:

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    for w := 1; w <= 1000; w++ {
        go worker(w, jobs, results)
    }

    go func() {
        for j := 1; j < 1000000; j++ {
            jobs <- j
        }
        close(jobs)
        fmt.Println("==========CLOSED==============")
    }

    for i := 1; i < 1000000; i++ {
        <-results
    }
}

请注意,我必须将最终的 for 循环更改为固定的迭代次数,否则它可能会在读取所有结果之前终止。

虽然 Thomas 的回答基本上是正确的,但我 post 我的版本是 IMO 更好的 Go,也适用于无缓冲通道:

func main() {
    jobs := make(chan int)
    results := make(chan int)

    var wg sync.WaitGroup

    // you could init the WaitGroup's count here with one call but this is error
    // prone - if you change the loop's size you could forget to change the
    // WG's count. So call wg.Add in loop
    //wg.Add(1000)
    for w := 1; w <= 1000; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            worker(w, jobs, results)
        }()
    }

    go func() {
        for j := 1; j < 2000; j++ {
            jobs <- j
        }
        close(jobs)
        fmt.Println("==========CLOSED==============")
    }()

    // in this gorutine we wait until all "producer" routines are done
    // then close the results channel so that the consumer loop stops
    go func() {
        wg.Wait()
        close(results)
    }()

    for i := range results {
        fmt.Print(i, " ")
    }
    fmt.Println("==========DONE==============")
}
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()

    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Millisecond * time.Duration(10))
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    wg := new(sync.WaitGroup)
    wg.Add(1000)

    for w := 1; w <= 1000; w++ {
        go worker(w, jobs, results, wg)
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    go func() {
        for j := 1; j < 1000000; j++ {
            jobs <- j
        }
        close(jobs)
    }()

    sum := 0
    for v := range results {
        sum += v
    }

    fmt.Println("==========CLOSED==============")
    fmt.Println("sum", sum)
}