Go循环轮询案例分支中的并发未命中

Concurrency in Go loop polling case branch not hit

我正在 Go 中实现一个非常简单的并发程序。有 2 个通道 tododone 用于指示哪个任务已完成。有 5 个 routines 被执行,每个都需要自己的时间来完成。我想每 100 毫秒查看一次正在发生的事情的状态。

不过我试过了,但轮询分支 case <-time.After(100 * time.Millisecond): 似乎从未被调用过。如果我将时间减少到小于 100 毫秒,它有时会被调用(不是以一致的方式)。

我的理解是 go func 在单独的 Go 调度程序线程中执行该方法。因此,我不明白为什么永远不会命中轮询的 case 。我试图将特定案例分支 before/after 移动到另一个但没有任何改变。

有什么建议吗?

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func concurrent(id int, done chan int, todo chan int) {
    for {
        // doing a task
        t := randInt(50, 100)
        time.Sleep(time.Duration(t) * time.Millisecond)
        done <- id
        // redo again this task
        t = randInt(50, 100)
        time.Sleep(time.Duration(t) * time.Millisecond)
        todo <- id
    }
}

func randInt(min int, max int) int {
    return (min + rand.Intn(max-min))
}

func seedRandom() {
    rand.Seed(time.Now().UTC().UnixNano())
}

func main() {
    seedRandom()

    todo := make(chan int, 5)
    done := make(chan int, 5)

    for i := 0; i < 5; i++ {
        todo <- i
    }

    timeout := make(chan bool)
    go func() {
        time.Sleep(1 * time.Second)
        timeout <- true
    }()

    var mu sync.Mutex
    var output []int

loop:
    for {
        select {
        case <-time.After(100 * time.Millisecond):
            //this branch is never hit?
            fmt.Printf("\nPolling status: %v\n", output)
        case <-timeout:
            fmt.Printf("\nDing ding, time is up!\n")
            break loop
        case id := <-done:
            mu.Lock()
            output = append(output, id)
            fmt.Printf(".") 
            mu.Unlock()
        case id := <-todo:
            go concurrent(id, done, todo)
        }
    }
}

更新 遵循答案后,我在 Go Playgound 中创建了这个版本:https://play.golang.org/p/f08t984BdPt。这按预期工作

在没有 default 个案例的情况下,当多个案例准备就绪时,它会执行其中一个 at random。这不是确定性的。

为了确保 case 运行s,你应该 运行 它在一个单独的 goroutine 中。 (在这种情况下,您必须同步对 output 变量的访问)。

此外你说“我希望每 100 毫秒看到一次”,但是 time.After 只在频道上发送一次。 要定期执行案例,请改用 <-time.NewTicker(100 * time.Millis).C

    var mu sync.Mutex
    var output []int

    go func() {
        ticker := time.NewTicker(100 * time.Millisecond)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                 // TODO: must synchronize access
                 fmt.Printf("\nPolling status: %v\n", output)
            
            case <-timeout:
                return
            }
        }
    }()

loop:
    for {
        select {
        // other cases
    }
}

你正在创建 5 个 goroutines(func concurrent)并且在你的 select 案例中使用了 todo 通道并且这个通道被用于并发函数所以你最终创建了很多 goroutines

func concurrent(id int, done chan int, todo chan int) {
    for {
        // doing a task
        t := randInt(50, 100)
        time.Sleep(time.Duration(t) * time.Millisecond)
        done <- id
        // redo again this task
        t = randInt(50, 100)
        time.Sleep(time.Duration(t) * time.Millisecond)
        by doing this call you are re-crating the go-routime
        todo <- id
    }
}

当我 运行 你的代码时,我得到了“runtime.NumGoroutine()” “goRoutines 的数量仍然 运行 347”

当你在 for 循环中实现 time.After(100 * time.Millisecond) 时,它会在每次遇到其他情况时重置,在你的情况下 case id := <-todo: && id := <-done: 总是会在 100 毫秒内被击中,这就是为什么你没有得到预期的输出(从你的代码现在的情况来看,我会说 go-routines 的数量将呈指数增长,并且每个 em 都将等待发送值以完成并且很少有人在待办事项通道上,因此您的循环将没有足够的时间(100 毫秒)等待 time.After)

loop:
for {
    select {
    case <-time.After(100 * time.Millisecond): ->this will always get reset ( we can use time.Ticker as it will create a single object that will signal for each and every 100ms https://golang.org/pkg/time/#NewTicker
        //this branch is never hit?
        fmt.Printf("\nPolling status: %v\n", output)
    case <-timeout:
        fmt.Printf("\nDing ding, time is up!\n")
        break loop
    case id := <-done: -> **this will get called**  
        //the mutex call is actually not very usefull as this only get called once per loop and is prefectly thread safe in this code 
        mu.Lock()
        output = append(output, id)
        fmt.Printf(".") 
        mu.Unlock()
    case id := <-todo: -> **this will get called** 
        go concurrent(id, done, todo)
    }
}

}

https://play.golang.org/p/SmlSIUIF5jn -> 我做了一些修改以使您的代码按预期工作..

尝试参考这个以更好地理解 golang 通道和 goroutine

https://tour.golang.org/concurrency/1

time.After(100*time.Millisecond) 创建一个全新的频道,带有一个全新的计时器,该计时器在调用该函数的那一刻开始。

所以,在你的循环中:

    for {
        select {
        // this statement resets the 100ms timer each time you execute the loop :
        case <-time.After(100*time.Millisecond):
            ...

你的分支永远不会被击中,因为 5 个 goroutines 在另一个 cases 上发送信号的时间不到 100ms,这个 time.After(100ms) 永远不会完成。

您需要选择一种方式在迭代之间保持相同的计时器。


这是调整 time.After(...) 调用的一种方法:

    // store the timer in a variable *outside* the loop :
    statusTimer := time.After(100*time.Millisecond)

    for {
        select {
        case <-statusTimer:
            fmt.Printf("\nPolling status: %v\n", output)
            // reset the timer :
            statusTimer = time.After(100*time.Millisecond)
        case <-timeout:
            ...

另一种方法是,正如@blackgreen 建议的那样,使用 time.Ticker :

    statusTicker := time.NewTicker(100*time.Millisecond)

    for {
        select {
        case <-statusTicker.C:
            fmt.Printf("\nPolling status: %v\n", output)
        case <-timeout:
        ...

旁注

一个。如果 output 切片不与其他 goroutine 共享,则不需要在其访问周围使用互斥锁:

   for {
      select {
      case <-statusTicker.C:
          fmt.Printf("\nPolling status: %v\n", output)
      ...
      case i <-done:
          // no race condition here : all happens within the same goroutine,
          // the 'select' statement makes sure that 'case's are executed
          // one at a time
          output = append(output, id)
          fmt.Printf(".")

b。对于您的 timeout 频道:

另一种“发出信号”表明通道发生了某些事件的通用方法是关闭通道而不是在其上发送值:

   // if you don't actually care about the value you send over this channel :
   // you can make it unbuffered, and use the empty 'struct{}' type
   timeout := make(chan struct{})

   go func(){
       // wait for some condition ...
       <-time.After(1*time.Second)
       close(timeout)
   }()


   select {
   case <-statusTimer:
       ...
   case <-timeout: // this branch will also be taken once timeout is closed
       fmt.Printf("\nDing ding, time is up!\n")
       break loop
   case ...

你要避免的错误如下:假设你想在 two goroutines

中使用那个 timeout 通道
  • 如果你通过超时通道发送一个值,只有一个 goroutine 会收到信号——它将“吃掉”通道中的值,而另一个 goroutine 将只有一个阻塞通道,
  • 如果你关闭通道,两个 goroutines 都会正确地“接收”信号