"Go Concurrency Patterns: Timing out, moving on" 博客中的非阻塞频道
Nonblocking channel in "Go Concurrency Patterns: Timing out, moving on" blog
我阅读了这篇关于如何从多个 conns 获取数据的 blog,我试着尝试一下以了解它是如何工作的。
func Query(conns []int) string {
ch := make(chan string)
go func() {
for m := range ch {
log.Println("message => ",m)
}
}()
for i, conn := range conns {
go func(c int,loop int) {
log.Println("start", loop)
select {
case ch <- get(conn,loop):
log.Println("got", loop)
default:
log.Println("skipped", loop)
}
log.Println("exited", loop)
}(conn,i)
}
log.Println("wait")
time.Sleep(5 * time.Second)
return "done"
}
func get(i int, loop int) string {
log.Println("process", loop)
return fmt.Sprintf("return loop %d", loop)
}
如果我在 select
块内注释 default case
,它将打印所有消息(预期输出)。
但是如果我使用非阻塞通道而不注释掉 default
情况,该通道只会打印一条消息(而其他通道将打印“跳过的”日志消息)。我不明白为什么传入的消息是默认大小写。是否有可能所有消息都进入默认情况?我认为所有消息都会被打印出来,因为 get 函数 returns 立即生效。
的 link
- 运行
go vet
你的代码,你会注意到它抛出警告 loop variable conn captured by func literal
因为你使用了 conn
而不是传递的参数,即 c
。所以我已经在我的代码中修复了它。但这与你的问题正交。
- 让我们了解
select
的工作原理 (go-tour-5, go-tour-6):
* A select blocks until one of its cases can run, then it executes that case.
It chooses one at random if multiple are ready.
* The default case in a select is run if no other case is ready.
所以如果你使用 select 而没有被 for
循环包围(例如 select { ... }
),它可以随机选择任何一个案例,因为多个案例已经准备好了或者它可能会选择默认,因为其他案例还没有准备好。并且 select
阻塞,直到它的一种情况(包括默认)可以 运行 然后它退出。但是如果你将它包含在一个 for 循环中(例如 for { select { ... } }
)并修改你的实现,那么你可以 运行 select 直到它达到你想要它达到的情况。当它发生时,goroutine 产生,退出。
所以你的想法基本上是一种误解。每个 运行 给出不同的结果(有时也一样);没有保证,因为如果多个案例准备就绪,select 可以选择任何案例,或者如果案例没有准备好,则使用 default
个案例。
试试这个程序并了解我更改的部分:
package main
import (
"fmt"
"log"
"time"
)
func query(conns []int) string {
ch := make(chan string)
// receiver
go func() {
for m := range ch {
log.Println("message: ", m)
}
}()
// sender
for i, conn := range conns {
go func(c, loop int) {
log.Println("start: ", loop)
for {
select {
case ch <- get(c, loop):
log.Println("got: ", loop)
log.Println("exited: ", loop)
return
default:
log.Println("skipped: ", loop)
}
}
}(conn, i)
}
log.Println("wait")
time.Sleep(5 * time.Second)
return "done"
}
func get(i, loop int) string {
log.Println("process: ", loop)
return fmt.Sprintf("return loop: %d", loop)
}
func main() {
res := query([]int{1, 2, 3})
fmt.Println(res)
}
运行(s)之一的输出:
2020/08/03 00:50:31 wait
2020/08/03 00:50:31 start: 0
2020/08/03 00:50:31 process: 0
2020/08/03 00:50:31 got: 0
2020/08/03 00:50:31 exited: 0
2020/08/03 00:50:31 start: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 start: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 message: return loop: 0
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 got: 2
2020/08/03 00:50:31 exited: 2
2020/08/03 00:50:31 message: return loop: 2
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 got: 1
2020/08/03 00:50:31 exited: 1
2020/08/03 00:50:31 message: return loop: 1
done
请注意,我收到了接收方 goroutine 收到的所有 return loop
消息。
编辑:
我们可以看到 goroutine 多次使用 default case,这表明通道必须被阻塞(case 还没有准备好)。这是由于通道未缓冲。
因此 ch := make(chan string)
可以修改为通过使其成为缓冲通道而使其本质上是非阻塞的。这是 Go Playground.
上改进版本的 link
package main
import (
"fmt"
"log"
"time"
)
func query(conns []int) string {
// buffered channel with a size of 3
ch := make(chan string, 3)
// receiver
go func() {
for m := range ch {
log.Println("message: ", m)
}
}()
// sender
for i, conn := range conns {
go func(c, loop int) {
log.Println("start: ", loop)
for {
select {
case ch <- get(c, loop):
log.Println("got: ", loop)
log.Println("exited: ", loop)
return
default:
log.Println("skipped: ", loop)
}
}
}(conn, i)
}
log.Println("wait")
time.Sleep(5 * time.Second)
return "done"
}
func get(i, loop int) string {
log.Println("process: ", loop)
return fmt.Sprintf("return loop: %d", loop)
}
func main() {
res := query([]int{1, 2, 3})
fmt.Println(res)
}
运行(s)之一的输出:
2009/11/10 23:00:00 wait
2009/11/10 23:00:00 start: 0
2009/11/10 23:00:00 process: 0
2009/11/10 23:00:00 got: 0
2009/11/10 23:00:00 exited: 0
2009/11/10 23:00:00 start: 1
2009/11/10 23:00:00 process: 1
2009/11/10 23:00:00 got: 1
2009/11/10 23:00:00 exited: 1
2009/11/10 23:00:00 start: 2
2009/11/10 23:00:00 process: 2
2009/11/10 23:00:00 got: 2
2009/11/10 23:00:00 exited: 2
2009/11/10 23:00:00 message: return loop: 0
2009/11/10 23:00:00 message: return loop: 1
2009/11/10 23:00:00 message: return loop: 2
done
我阅读了这篇关于如何从多个 conns 获取数据的 blog,我试着尝试一下以了解它是如何工作的。
func Query(conns []int) string {
ch := make(chan string)
go func() {
for m := range ch {
log.Println("message => ",m)
}
}()
for i, conn := range conns {
go func(c int,loop int) {
log.Println("start", loop)
select {
case ch <- get(conn,loop):
log.Println("got", loop)
default:
log.Println("skipped", loop)
}
log.Println("exited", loop)
}(conn,i)
}
log.Println("wait")
time.Sleep(5 * time.Second)
return "done"
}
func get(i int, loop int) string {
log.Println("process", loop)
return fmt.Sprintf("return loop %d", loop)
}
如果我在 select
块内注释 default case
,它将打印所有消息(预期输出)。
但是如果我使用非阻塞通道而不注释掉 default
情况,该通道只会打印一条消息(而其他通道将打印“跳过的”日志消息)。我不明白为什么传入的消息是默认大小写。是否有可能所有消息都进入默认情况?我认为所有消息都会被打印出来,因为 get 函数 returns 立即生效。
- 运行
go vet
你的代码,你会注意到它抛出警告loop variable conn captured by func literal
因为你使用了conn
而不是传递的参数,即c
。所以我已经在我的代码中修复了它。但这与你的问题正交。 - 让我们了解
select
的工作原理 (go-tour-5, go-tour-6):
* A select blocks until one of its cases can run, then it executes that case.
It chooses one at random if multiple are ready.
* The default case in a select is run if no other case is ready.
所以如果你使用 select 而没有被 for
循环包围(例如 select { ... }
),它可以随机选择任何一个案例,因为多个案例已经准备好了或者它可能会选择默认,因为其他案例还没有准备好。并且 select
阻塞,直到它的一种情况(包括默认)可以 运行 然后它退出。但是如果你将它包含在一个 for 循环中(例如 for { select { ... } }
)并修改你的实现,那么你可以 运行 select 直到它达到你想要它达到的情况。当它发生时,goroutine 产生,退出。
所以你的想法基本上是一种误解。每个 运行 给出不同的结果(有时也一样);没有保证,因为如果多个案例准备就绪,select 可以选择任何案例,或者如果案例没有准备好,则使用 default
个案例。
试试这个程序并了解我更改的部分:
package main
import (
"fmt"
"log"
"time"
)
func query(conns []int) string {
ch := make(chan string)
// receiver
go func() {
for m := range ch {
log.Println("message: ", m)
}
}()
// sender
for i, conn := range conns {
go func(c, loop int) {
log.Println("start: ", loop)
for {
select {
case ch <- get(c, loop):
log.Println("got: ", loop)
log.Println("exited: ", loop)
return
default:
log.Println("skipped: ", loop)
}
}
}(conn, i)
}
log.Println("wait")
time.Sleep(5 * time.Second)
return "done"
}
func get(i, loop int) string {
log.Println("process: ", loop)
return fmt.Sprintf("return loop: %d", loop)
}
func main() {
res := query([]int{1, 2, 3})
fmt.Println(res)
}
运行(s)之一的输出:
2020/08/03 00:50:31 wait
2020/08/03 00:50:31 start: 0
2020/08/03 00:50:31 process: 0
2020/08/03 00:50:31 got: 0
2020/08/03 00:50:31 exited: 0
2020/08/03 00:50:31 start: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 start: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 skipped: 2
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 message: return loop: 0
2020/08/03 00:50:31 process: 2
2020/08/03 00:50:31 got: 2
2020/08/03 00:50:31 exited: 2
2020/08/03 00:50:31 message: return loop: 2
2020/08/03 00:50:31 skipped: 1
2020/08/03 00:50:31 process: 1
2020/08/03 00:50:31 got: 1
2020/08/03 00:50:31 exited: 1
2020/08/03 00:50:31 message: return loop: 1
done
请注意,我收到了接收方 goroutine 收到的所有 return loop
消息。
编辑: 我们可以看到 goroutine 多次使用 default case,这表明通道必须被阻塞(case 还没有准备好)。这是由于通道未缓冲。
因此 ch := make(chan string)
可以修改为通过使其成为缓冲通道而使其本质上是非阻塞的。这是 Go Playground.
package main
import (
"fmt"
"log"
"time"
)
func query(conns []int) string {
// buffered channel with a size of 3
ch := make(chan string, 3)
// receiver
go func() {
for m := range ch {
log.Println("message: ", m)
}
}()
// sender
for i, conn := range conns {
go func(c, loop int) {
log.Println("start: ", loop)
for {
select {
case ch <- get(c, loop):
log.Println("got: ", loop)
log.Println("exited: ", loop)
return
default:
log.Println("skipped: ", loop)
}
}
}(conn, i)
}
log.Println("wait")
time.Sleep(5 * time.Second)
return "done"
}
func get(i, loop int) string {
log.Println("process: ", loop)
return fmt.Sprintf("return loop: %d", loop)
}
func main() {
res := query([]int{1, 2, 3})
fmt.Println(res)
}
运行(s)之一的输出:
2009/11/10 23:00:00 wait
2009/11/10 23:00:00 start: 0
2009/11/10 23:00:00 process: 0
2009/11/10 23:00:00 got: 0
2009/11/10 23:00:00 exited: 0
2009/11/10 23:00:00 start: 1
2009/11/10 23:00:00 process: 1
2009/11/10 23:00:00 got: 1
2009/11/10 23:00:00 exited: 1
2009/11/10 23:00:00 start: 2
2009/11/10 23:00:00 process: 2
2009/11/10 23:00:00 got: 2
2009/11/10 23:00:00 exited: 2
2009/11/10 23:00:00 message: return loop: 0
2009/11/10 23:00:00 message: return loop: 1
2009/11/10 23:00:00 message: return loop: 2
done