Go循环轮询案例分支中的并发未命中
Concurrency in Go loop polling case branch not hit
我正在 Go 中实现一个非常简单的并发程序。有 2 个通道 todo
和 done
用于指示哪个任务已完成。有 5 个 routines
被执行,每个都需要自己的时间来完成。我想每 100 毫秒查看一次正在发生的事情的状态。
不过我试过了,但轮询分支 case <-time.After(100 * time.Millisecond):
似乎从未被调用过。如果我将时间减少到小于 100 毫秒,它有时会被调用(不是以一致的方式)。
我的理解是 go func
在单独的 Go 调度程序线程中执行该方法。因此,我不明白为什么永远不会命中轮询的 case
。我试图将特定案例分支 before/after 移动到另一个但没有任何改变。
有什么建议吗?
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func concurrent(id int, done chan int, todo chan int) {
for {
// doing a task
t := randInt(50, 100)
time.Sleep(time.Duration(t) * time.Millisecond)
done <- id
// redo again this task
t = randInt(50, 100)
time.Sleep(time.Duration(t) * time.Millisecond)
todo <- id
}
}
func randInt(min int, max int) int {
return (min + rand.Intn(max-min))
}
func seedRandom() {
rand.Seed(time.Now().UTC().UnixNano())
}
func main() {
seedRandom()
todo := make(chan int, 5)
done := make(chan int, 5)
for i := 0; i < 5; i++ {
todo <- i
}
timeout := make(chan bool)
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()
var mu sync.Mutex
var output []int
loop:
for {
select {
case <-time.After(100 * time.Millisecond):
//this branch is never hit?
fmt.Printf("\nPolling status: %v\n", output)
case <-timeout:
fmt.Printf("\nDing ding, time is up!\n")
break loop
case id := <-done:
mu.Lock()
output = append(output, id)
fmt.Printf(".")
mu.Unlock()
case id := <-todo:
go concurrent(id, done, todo)
}
}
}
更新 遵循答案后,我在 Go Playgound 中创建了这个版本:https://play.golang.org/p/f08t984BdPt。这按预期工作
在没有 default
个案例的情况下,当多个案例准备就绪时,它会执行其中一个 at random。这不是确定性的。
为了确保 case 运行s,你应该 运行 它在一个单独的 goroutine 中。 (在这种情况下,您必须同步对 output
变量的访问)。
此外你说“我希望每 100 毫秒看到一次”,但是 time.After
只在频道上发送一次。
要定期执行案例,请改用 <-time.NewTicker(100 * time.Millis).C
。
var mu sync.Mutex
var output []int
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// TODO: must synchronize access
fmt.Printf("\nPolling status: %v\n", output)
case <-timeout:
return
}
}
}()
loop:
for {
select {
// other cases
}
}
你正在创建 5 个 goroutines(func concurrent)并且在你的 select 案例中使用了 todo 通道并且这个通道被用于并发函数所以你最终创建了很多 goroutines
func concurrent(id int, done chan int, todo chan int) {
for {
// doing a task
t := randInt(50, 100)
time.Sleep(time.Duration(t) * time.Millisecond)
done <- id
// redo again this task
t = randInt(50, 100)
time.Sleep(time.Duration(t) * time.Millisecond)
by doing this call you are re-crating the go-routime
todo <- id
}
}
当我 运行 你的代码时,我得到了“runtime.NumGoroutine()”
“goRoutines 的数量仍然 运行 347”
当你在 for 循环中实现 time.After(100 * time.Millisecond) 时,它会在每次遇到其他情况时重置,在你的情况下
case id := <-todo: && id := <-done: 总是会在 100 毫秒内被击中,这就是为什么你没有得到预期的输出(从你的代码现在的情况来看,我会说 go-routines 的数量将呈指数增长,并且每个 em 都将等待发送值以完成并且很少有人在待办事项通道上,因此您的循环将没有足够的时间(100 毫秒)等待 time.After)
loop:
for {
select {
case <-time.After(100 * time.Millisecond): ->this will always get reset ( we can use time.Ticker as it will create a single object that will signal for each and every 100ms https://golang.org/pkg/time/#NewTicker
//this branch is never hit?
fmt.Printf("\nPolling status: %v\n", output)
case <-timeout:
fmt.Printf("\nDing ding, time is up!\n")
break loop
case id := <-done: -> **this will get called**
//the mutex call is actually not very usefull as this only get called once per loop and is prefectly thread safe in this code
mu.Lock()
output = append(output, id)
fmt.Printf(".")
mu.Unlock()
case id := <-todo: -> **this will get called**
go concurrent(id, done, todo)
}
}
}
https://play.golang.org/p/SmlSIUIF5jn -> 我做了一些修改以使您的代码按预期工作..
尝试参考这个以更好地理解 golang 通道和 goroutine
time.After(100*time.Millisecond)
创建一个全新的频道,带有一个全新的计时器,该计时器在调用该函数的那一刻开始。
所以,在你的循环中:
for {
select {
// this statement resets the 100ms timer each time you execute the loop :
case <-time.After(100*time.Millisecond):
...
你的分支永远不会被击中,因为 5 个 goroutines 在另一个 case
s 上发送信号的时间不到 100ms,这个 time.After(100ms)
永远不会完成。
您需要选择一种方式在迭代之间保持相同的计时器。
这是调整 time.After(...)
调用的一种方法:
// store the timer in a variable *outside* the loop :
statusTimer := time.After(100*time.Millisecond)
for {
select {
case <-statusTimer:
fmt.Printf("\nPolling status: %v\n", output)
// reset the timer :
statusTimer = time.After(100*time.Millisecond)
case <-timeout:
...
另一种方法是,正如@blackgreen 建议的那样,使用 time.Ticker
:
statusTicker := time.NewTicker(100*time.Millisecond)
for {
select {
case <-statusTicker.C:
fmt.Printf("\nPolling status: %v\n", output)
case <-timeout:
...
旁注
一个。如果 output
切片不与其他 goroutine 共享,则不需要在其访问周围使用互斥锁:
for {
select {
case <-statusTicker.C:
fmt.Printf("\nPolling status: %v\n", output)
...
case i <-done:
// no race condition here : all happens within the same goroutine,
// the 'select' statement makes sure that 'case's are executed
// one at a time
output = append(output, id)
fmt.Printf(".")
b。对于您的 timeout
频道:
另一种“发出信号”表明通道发生了某些事件的通用方法是关闭通道而不是在其上发送值:
// if you don't actually care about the value you send over this channel :
// you can make it unbuffered, and use the empty 'struct{}' type
timeout := make(chan struct{})
go func(){
// wait for some condition ...
<-time.After(1*time.Second)
close(timeout)
}()
select {
case <-statusTimer:
...
case <-timeout: // this branch will also be taken once timeout is closed
fmt.Printf("\nDing ding, time is up!\n")
break loop
case ...
你要避免的错误如下:假设你想在 two goroutines
中使用那个 timeout
通道
- 如果你通过超时通道发送一个值,只有一个 goroutine 会收到信号——它将“吃掉”通道中的值,而另一个 goroutine 将只有一个阻塞通道,
- 如果你关闭通道,两个 goroutines 都会正确地“接收”信号
我正在 Go 中实现一个非常简单的并发程序。有 2 个通道 todo
和 done
用于指示哪个任务已完成。有 5 个 routines
被执行,每个都需要自己的时间来完成。我想每 100 毫秒查看一次正在发生的事情的状态。
不过我试过了,但轮询分支 case <-time.After(100 * time.Millisecond):
似乎从未被调用过。如果我将时间减少到小于 100 毫秒,它有时会被调用(不是以一致的方式)。
我的理解是 go func
在单独的 Go 调度程序线程中执行该方法。因此,我不明白为什么永远不会命中轮询的 case
。我试图将特定案例分支 before/after 移动到另一个但没有任何改变。
有什么建议吗?
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func concurrent(id int, done chan int, todo chan int) {
for {
// doing a task
t := randInt(50, 100)
time.Sleep(time.Duration(t) * time.Millisecond)
done <- id
// redo again this task
t = randInt(50, 100)
time.Sleep(time.Duration(t) * time.Millisecond)
todo <- id
}
}
func randInt(min int, max int) int {
return (min + rand.Intn(max-min))
}
func seedRandom() {
rand.Seed(time.Now().UTC().UnixNano())
}
func main() {
seedRandom()
todo := make(chan int, 5)
done := make(chan int, 5)
for i := 0; i < 5; i++ {
todo <- i
}
timeout := make(chan bool)
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()
var mu sync.Mutex
var output []int
loop:
for {
select {
case <-time.After(100 * time.Millisecond):
//this branch is never hit?
fmt.Printf("\nPolling status: %v\n", output)
case <-timeout:
fmt.Printf("\nDing ding, time is up!\n")
break loop
case id := <-done:
mu.Lock()
output = append(output, id)
fmt.Printf(".")
mu.Unlock()
case id := <-todo:
go concurrent(id, done, todo)
}
}
}
更新 遵循答案后,我在 Go Playgound 中创建了这个版本:https://play.golang.org/p/f08t984BdPt。这按预期工作
在没有 default
个案例的情况下,当多个案例准备就绪时,它会执行其中一个 at random。这不是确定性的。
为了确保 case 运行s,你应该 运行 它在一个单独的 goroutine 中。 (在这种情况下,您必须同步对 output
变量的访问)。
此外你说“我希望每 100 毫秒看到一次”,但是 time.After
只在频道上发送一次。
要定期执行案例,请改用 <-time.NewTicker(100 * time.Millis).C
。
var mu sync.Mutex
var output []int
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// TODO: must synchronize access
fmt.Printf("\nPolling status: %v\n", output)
case <-timeout:
return
}
}
}()
loop:
for {
select {
// other cases
}
}
你正在创建 5 个 goroutines(func concurrent)并且在你的 select 案例中使用了 todo 通道并且这个通道被用于并发函数所以你最终创建了很多 goroutines
func concurrent(id int, done chan int, todo chan int) {
for {
// doing a task
t := randInt(50, 100)
time.Sleep(time.Duration(t) * time.Millisecond)
done <- id
// redo again this task
t = randInt(50, 100)
time.Sleep(time.Duration(t) * time.Millisecond)
by doing this call you are re-crating the go-routime
todo <- id
}
}
当我 运行 你的代码时,我得到了“runtime.NumGoroutine()” “goRoutines 的数量仍然 运行 347”
当你在 for 循环中实现 time.After(100 * time.Millisecond) 时,它会在每次遇到其他情况时重置,在你的情况下 case id := <-todo: && id := <-done: 总是会在 100 毫秒内被击中,这就是为什么你没有得到预期的输出(从你的代码现在的情况来看,我会说 go-routines 的数量将呈指数增长,并且每个 em 都将等待发送值以完成并且很少有人在待办事项通道上,因此您的循环将没有足够的时间(100 毫秒)等待 time.After)
loop:
for {
select {
case <-time.After(100 * time.Millisecond): ->this will always get reset ( we can use time.Ticker as it will create a single object that will signal for each and every 100ms https://golang.org/pkg/time/#NewTicker
//this branch is never hit?
fmt.Printf("\nPolling status: %v\n", output)
case <-timeout:
fmt.Printf("\nDing ding, time is up!\n")
break loop
case id := <-done: -> **this will get called**
//the mutex call is actually not very usefull as this only get called once per loop and is prefectly thread safe in this code
mu.Lock()
output = append(output, id)
fmt.Printf(".")
mu.Unlock()
case id := <-todo: -> **this will get called**
go concurrent(id, done, todo)
}
}
}
https://play.golang.org/p/SmlSIUIF5jn -> 我做了一些修改以使您的代码按预期工作..
尝试参考这个以更好地理解 golang 通道和 goroutine
time.After(100*time.Millisecond)
创建一个全新的频道,带有一个全新的计时器,该计时器在调用该函数的那一刻开始。
所以,在你的循环中:
for {
select {
// this statement resets the 100ms timer each time you execute the loop :
case <-time.After(100*time.Millisecond):
...
你的分支永远不会被击中,因为 5 个 goroutines 在另一个 case
s 上发送信号的时间不到 100ms,这个 time.After(100ms)
永远不会完成。
您需要选择一种方式在迭代之间保持相同的计时器。
这是调整 time.After(...)
调用的一种方法:
// store the timer in a variable *outside* the loop :
statusTimer := time.After(100*time.Millisecond)
for {
select {
case <-statusTimer:
fmt.Printf("\nPolling status: %v\n", output)
// reset the timer :
statusTimer = time.After(100*time.Millisecond)
case <-timeout:
...
另一种方法是,正如@blackgreen 建议的那样,使用 time.Ticker
:
statusTicker := time.NewTicker(100*time.Millisecond)
for {
select {
case <-statusTicker.C:
fmt.Printf("\nPolling status: %v\n", output)
case <-timeout:
...
旁注
一个。如果 output
切片不与其他 goroutine 共享,则不需要在其访问周围使用互斥锁:
for {
select {
case <-statusTicker.C:
fmt.Printf("\nPolling status: %v\n", output)
...
case i <-done:
// no race condition here : all happens within the same goroutine,
// the 'select' statement makes sure that 'case's are executed
// one at a time
output = append(output, id)
fmt.Printf(".")
b。对于您的 timeout
频道:
另一种“发出信号”表明通道发生了某些事件的通用方法是关闭通道而不是在其上发送值:
// if you don't actually care about the value you send over this channel :
// you can make it unbuffered, and use the empty 'struct{}' type
timeout := make(chan struct{})
go func(){
// wait for some condition ...
<-time.After(1*time.Second)
close(timeout)
}()
select {
case <-statusTimer:
...
case <-timeout: // this branch will also be taken once timeout is closed
fmt.Printf("\nDing ding, time is up!\n")
break loop
case ...
你要避免的错误如下:假设你想在 two goroutines
中使用那个timeout
通道
- 如果你通过超时通道发送一个值,只有一个 goroutine 会收到信号——它将“吃掉”通道中的值,而另一个 goroutine 将只有一个阻塞通道,
- 如果你关闭通道,两个 goroutines 都会正确地“接收”信号