"Go Concurrency Patterns: Timing out, moving on" 博客中的非阻塞频道

Nonblocking channel in "Go Concurrency Patterns: Timing out, moving on" blog

我阅读了这篇关于如何从多个 conns 获取数据的 blog,我试着尝试一下以了解它是如何工作的。

func Query(conns []int) string {
    ch := make(chan string)

    go func() {

        for m := range ch {
        log.Println("message => ",m)
        }   

    }()

    for i, conn := range conns {
        go func(c int,loop int) {
            log.Println("start", loop) 

            select {
            case ch <- get(conn,loop):
                log.Println("got", loop)  
            default:
                log.Println("skipped", loop)
            }

            log.Println("exited", loop)
        }(conn,i)
    }


    log.Println("wait")
    time.Sleep(5 * time.Second)
    
    return "done"
}

func get(i int, loop int) string {
   log.Println("process", loop)
   return fmt.Sprintf("return loop %d", loop)
}

如果我在 select 块内注释 default case,它将打印所有消息(预期输出)。

但是如果我使用非阻塞通道而不注释掉 default 情况,该通道只会打印一条消息(而其他通道将打印“跳过的”日志消息)。我不明白为什么传入的消息是默认大小写。是否有可能所有消息都进入默认情况?我认为所有消息都会被打印出来,因为 get 函数 returns 立即生效。

这是 Go Playground

的 link
  1. 运行 go vet 你的代码,你会注意到它抛出警告 loop variable conn captured by func literal 因为你使用了 conn 而不是传递的参数,即 c。所以我已经在我的代码中修复了它。但这与你的问题正交。
  2. 让我们了解 select 的工作原理 (go-tour-5, go-tour-6):
* A select blocks until one of its cases can run, then it executes that case. 
It chooses one at random if multiple are ready.

* The default case in a select is run if no other case is ready.

所以如果你使用 select 而没有被 for 循环包围(例如 select { ... }),它可以随机选择任何一个案例,因为多个案例已经准备好了或者它可能会选择默认,因为其他案例还没有准备好。并且 select 阻塞,直到它的一种情况(包括默认)可以 运行 然后它退出。但是如果你将它包含在一个 for 循环中(例如 for { select { ... } })并修改你的实现,那么你可以 运行 select 直到它达到你想要它达到的情况。当它发生时,goroutine 产生,退出。

所以你的想法基本上是一种误解。每个 运行 给出不同的结果(有时也一样);没有保证,因为如果多个案例准备就绪,select 可以选择任何案例,或者如果案例没有准备好,则使用 default 个案例。

试试这个程序并了解我更改的部分:

package main

import (
    "fmt"
    "log"
    "time"
)

func query(conns []int) string {
    ch := make(chan string)

    // receiver
    go func() {
        for m := range ch {
            log.Println("message: ", m)
        }
    }()

    // sender
    for i, conn := range conns {
        go func(c, loop int) {
            log.Println("start: ", loop)
            for {
                select {
                case ch <- get(c, loop):
                    log.Println("got: ", loop)
                    log.Println("exited: ", loop)
                    return
                default:
                    log.Println("skipped: ", loop)
                }
            }
        }(conn, i)
    }

    log.Println("wait")
    time.Sleep(5 * time.Second)

    return "done"
}

func get(i, loop int) string {
    log.Println("process: ", loop)
    return fmt.Sprintf("return loop: %d", loop)
}

func main() {
    res := query([]int{1, 2, 3})
    fmt.Println(res)
}

Go Playground Link

运行(s)之一的输出:

2020/08/03 00:50:31 wait
2020/08/03 00:50:31 start:  0
2020/08/03 00:50:31 process:  0
2020/08/03 00:50:31 got:  0
2020/08/03 00:50:31 exited:  0
2020/08/03 00:50:31 start:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 start:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 message:  return loop: 0
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 got:  2
2020/08/03 00:50:31 exited:  2
2020/08/03 00:50:31 message:  return loop: 2
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 got:  1
2020/08/03 00:50:31 exited:  1
2020/08/03 00:50:31 message:  return loop: 1
done

请注意,我收到了接收方 goroutine 收到的所有 return loop 消息。


编辑: 我们可以看到 goroutine 多次使用 default case,这表明通道必须被阻塞(case 还没有准备好)。这是由于通道未缓冲。

因此 ch := make(chan string) 可以修改为通过使其成为缓冲通道而使其本质上是非阻塞的。这是 Go Playground.

上改进版本的 link
package main

import (
    "fmt"
    "log"
    "time"
)

func query(conns []int) string {
    // buffered channel with a size of 3
    ch := make(chan string, 3)

    // receiver
    go func() {
        for m := range ch {
            log.Println("message: ", m)
        }
    }()

    // sender
    for i, conn := range conns {
        go func(c, loop int) {
            log.Println("start: ", loop)
            for {
                select {
                case ch <- get(c, loop):
                    log.Println("got: ", loop)
                    log.Println("exited: ", loop)
                    return
                default:
                    log.Println("skipped: ", loop)
                }
            }
        }(conn, i)
    }

    log.Println("wait")
    time.Sleep(5 * time.Second)

    return "done"
}

func get(i, loop int) string {
    log.Println("process: ", loop)
    return fmt.Sprintf("return loop: %d", loop)
}

func main() {
    res := query([]int{1, 2, 3})
    fmt.Println(res)
}

运行(s)之一的输出:

2009/11/10 23:00:00 wait
2009/11/10 23:00:00 start:  0
2009/11/10 23:00:00 process:  0
2009/11/10 23:00:00 got:  0
2009/11/10 23:00:00 exited:  0
2009/11/10 23:00:00 start:  1
2009/11/10 23:00:00 process:  1
2009/11/10 23:00:00 got:  1
2009/11/10 23:00:00 exited:  1
2009/11/10 23:00:00 start:  2
2009/11/10 23:00:00 process:  2
2009/11/10 23:00:00 got:  2
2009/11/10 23:00:00 exited:  2
2009/11/10 23:00:00 message:  return loop: 0
2009/11/10 23:00:00 message:  return loop: 1
2009/11/10 23:00:00 message:  return loop: 2
done