工作池上的通道死锁
Channel deadlock on workerpool
我正在通过创建一个包含 1000 个工作人员的工作池来使用频道。目前我收到以下错误:
fatal error: all goroutines are asleep - deadlock!
这是我的代码:
package main
import "fmt"
import "time"
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results)
}
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
for i:=0;i<len(results);i++ {
<-results
}
}
为什么会这样?我还是个新手,我希望能理解这一点。
以下代码:
for j := 1; j < 1000000; j++ {
jobs <- j
}
应该运行在一个单独的 goroutine 中,因为所有的 worker 都会阻塞等待主 gorourine 在结果通道上接收,而主 goroutine 卡在循环中。
问题是您的频道已满。 main()
例程在读取任何结果之前尝试将所有作业放入 jobs
通道。但是 results
通道只有 space 的 100 个结果,在任何写入通道之前都会阻塞,所以所有的工作人员最终都会阻塞等待这个通道中的 space – space那永远不会到来,因为 main()
还没有开始阅读 results
。
要快速解决此问题,您可以使 jobs
足够大以容纳所有作业,以便 main()
函数可以继续读取阶段;或者你可以使 results
足够大以容纳所有结果,这样工作人员就可以无阻塞地输出他们的结果。
更好的方法是创建另一个 goroutine 来填充 jobs
队列,这样 main()
可以直接读取结果:
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results)
}
go func() {
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
}
for i := 1; i < 1000000; i++ {
<-results
}
}
请注意,我必须将最终的 for
循环更改为固定的迭代次数,否则它可能会在读取所有结果之前终止。
虽然 Thomas 的回答基本上是正确的,但我 post 我的版本是 IMO 更好的 Go,也适用于无缓冲通道:
func main() {
jobs := make(chan int)
results := make(chan int)
var wg sync.WaitGroup
// you could init the WaitGroup's count here with one call but this is error
// prone - if you change the loop's size you could forget to change the
// WG's count. So call wg.Add in loop
//wg.Add(1000)
for w := 1; w <= 1000; w++ {
wg.Add(1)
go func() {
defer wg.Done()
worker(w, jobs, results)
}()
}
go func() {
for j := 1; j < 2000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
}()
// in this gorutine we wait until all "producer" routines are done
// then close the results channel so that the consumer loop stops
go func() {
wg.Wait()
close(results)
}()
for i := range results {
fmt.Print(i, " ")
}
fmt.Println("==========DONE==============")
}
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Millisecond * time.Duration(10))
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
wg := new(sync.WaitGroup)
wg.Add(1000)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results, wg)
}
go func() {
wg.Wait()
close(results)
}()
go func() {
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
}()
sum := 0
for v := range results {
sum += v
}
fmt.Println("==========CLOSED==============")
fmt.Println("sum", sum)
}
我正在通过创建一个包含 1000 个工作人员的工作池来使用频道。目前我收到以下错误:
fatal error: all goroutines are asleep - deadlock!
这是我的代码:
package main
import "fmt"
import "time"
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results)
}
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
for i:=0;i<len(results);i++ {
<-results
}
}
为什么会这样?我还是个新手,我希望能理解这一点。
以下代码:
for j := 1; j < 1000000; j++ {
jobs <- j
}
应该运行在一个单独的 goroutine 中,因为所有的 worker 都会阻塞等待主 gorourine 在结果通道上接收,而主 goroutine 卡在循环中。
问题是您的频道已满。 main()
例程在读取任何结果之前尝试将所有作业放入 jobs
通道。但是 results
通道只有 space 的 100 个结果,在任何写入通道之前都会阻塞,所以所有的工作人员最终都会阻塞等待这个通道中的 space – space那永远不会到来,因为 main()
还没有开始阅读 results
。
要快速解决此问题,您可以使 jobs
足够大以容纳所有作业,以便 main()
函数可以继续读取阶段;或者你可以使 results
足够大以容纳所有结果,这样工作人员就可以无阻塞地输出他们的结果。
更好的方法是创建另一个 goroutine 来填充 jobs
队列,这样 main()
可以直接读取结果:
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results)
}
go func() {
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
}
for i := 1; i < 1000000; i++ {
<-results
}
}
请注意,我必须将最终的 for
循环更改为固定的迭代次数,否则它可能会在读取所有结果之前终止。
虽然 Thomas 的回答基本上是正确的,但我 post 我的版本是 IMO 更好的 Go,也适用于无缓冲通道:
func main() {
jobs := make(chan int)
results := make(chan int)
var wg sync.WaitGroup
// you could init the WaitGroup's count here with one call but this is error
// prone - if you change the loop's size you could forget to change the
// WG's count. So call wg.Add in loop
//wg.Add(1000)
for w := 1; w <= 1000; w++ {
wg.Add(1)
go func() {
defer wg.Done()
worker(w, jobs, results)
}()
}
go func() {
for j := 1; j < 2000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
}()
// in this gorutine we wait until all "producer" routines are done
// then close the results channel so that the consumer loop stops
go func() {
wg.Wait()
close(results)
}()
for i := range results {
fmt.Print(i, " ")
}
fmt.Println("==========DONE==============")
}
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Millisecond * time.Duration(10))
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
wg := new(sync.WaitGroup)
wg.Add(1000)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results, wg)
}
go func() {
wg.Wait()
close(results)
}()
go func() {
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
}()
sum := 0
for v := range results {
sum += v
}
fmt.Println("==========CLOSED==============")
fmt.Println("sum", sum)
}