为什么这个 goroutine 不调用 wg.Done()?
Why does this goroutine not call wg.Done()?
假设registerChan 上任意时刻最多有两个元素(工作地址)。然后由于某些原因,下面的代码在最后两个goroutines中没有调用wg.Done()。
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var nOther int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
nOther = nReduce
case reducePhase:
ntasks = nReduce
nOther = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nOther)
const rpcname = "Worker.DoTask"
var wg sync.WaitGroup
for taskNumber := 0; taskNumber < ntasks; taskNumber++ {
file := mapFiles[taskNumber%len(mapFiles)]
taskArgs := DoTaskArgs{jobName, file, phase, taskNumber, nOther}
wg.Add(1)
go func(taskArgs DoTaskArgs) {
workerAddr := <-registerChan
print("hello\n")
// _ = call(workerAddr, rpcname, taskArgs, nil)
registerChan <- workerAddr
wg.Done()
}(taskArgs)
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)
}
如果我把 wg.Done()
放在 registerChan <- workerAddr
之前,它就可以正常工作,我不知道为什么。我也尝试过推迟 wg.Done() 但这似乎不起作用,尽管我希望如此。我想我对 go routines 和 channels 的工作方式有一些误解,这让我感到困惑。
因为它在此处停止:
workerAddr := <-registerChan
对于缓冲频道:
要让这个 workerAddr := <-registerChan
工作:通道 registerChan
必须有一个值;否则,代码将在此处停止等待通道。
我以这种方式运行你的代码(尝试this):
package main
import (
"fmt"
"sync"
)
func main() {
registerChan := make(chan int, 1)
for i := 1; i <= 10; i++ {
wg.Add(1)
go fn(i, registerChan)
}
registerChan <- 0 // seed
wg.Wait()
fmt.Println(<-registerChan)
}
func fn(taskArgs int, registerChan chan int) {
workerAddr := <-registerChan
workerAddr += taskArgs
registerChan <- workerAddr
wg.Done()
}
var wg sync.WaitGroup
输出:
55
解释:
此代码使用通道和 10 个 goroutine 加上一个主 goroutine 将 1 加到 10。
希望对您有所帮助。
当你运行这个语句registerChan <- workerAddr
时,如果通道容量已满你不能添加它,它会阻塞。如果你有一个池,比如 10 个 workerAddr,你可以在调用 schedule
之前将它们全部添加到容量为 10 的缓冲通道中。不要在调用后添加,以保证如果你从通道中取值,之后有 space 再次添加它。在 goroutine 的开头使用 defer
是好的。
假设registerChan 上任意时刻最多有两个元素(工作地址)。然后由于某些原因,下面的代码在最后两个goroutines中没有调用wg.Done()。
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var nOther int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
nOther = nReduce
case reducePhase:
ntasks = nReduce
nOther = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nOther)
const rpcname = "Worker.DoTask"
var wg sync.WaitGroup
for taskNumber := 0; taskNumber < ntasks; taskNumber++ {
file := mapFiles[taskNumber%len(mapFiles)]
taskArgs := DoTaskArgs{jobName, file, phase, taskNumber, nOther}
wg.Add(1)
go func(taskArgs DoTaskArgs) {
workerAddr := <-registerChan
print("hello\n")
// _ = call(workerAddr, rpcname, taskArgs, nil)
registerChan <- workerAddr
wg.Done()
}(taskArgs)
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)
}
如果我把 wg.Done()
放在 registerChan <- workerAddr
之前,它就可以正常工作,我不知道为什么。我也尝试过推迟 wg.Done() 但这似乎不起作用,尽管我希望如此。我想我对 go routines 和 channels 的工作方式有一些误解,这让我感到困惑。
因为它在此处停止:
workerAddr := <-registerChan
对于缓冲频道:
要让这个 workerAddr := <-registerChan
工作:通道 registerChan
必须有一个值;否则,代码将在此处停止等待通道。
我以这种方式运行你的代码(尝试this):
package main
import (
"fmt"
"sync"
)
func main() {
registerChan := make(chan int, 1)
for i := 1; i <= 10; i++ {
wg.Add(1)
go fn(i, registerChan)
}
registerChan <- 0 // seed
wg.Wait()
fmt.Println(<-registerChan)
}
func fn(taskArgs int, registerChan chan int) {
workerAddr := <-registerChan
workerAddr += taskArgs
registerChan <- workerAddr
wg.Done()
}
var wg sync.WaitGroup
输出:
55
解释:
此代码使用通道和 10 个 goroutine 加上一个主 goroutine 将 1 加到 10。
希望对您有所帮助。
当你运行这个语句registerChan <- workerAddr
时,如果通道容量已满你不能添加它,它会阻塞。如果你有一个池,比如 10 个 workerAddr,你可以在调用 schedule
之前将它们全部添加到容量为 10 的缓冲通道中。不要在调用后添加,以保证如果你从通道中取值,之后有 space 再次添加它。在 goroutine 的开头使用 defer
是好的。