从 go 通道流中读取

Reading from a stream of go channels

我试图理解以下从多个频道读取的代码。我在思考这个想法时遇到了一些困难。

    bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
        outStream := make(chan interface{})
        go func() {
            defer close(outStream)
            for {
                var stream <-chan interface{}
                select {
                case <-done:
                    return
                case maybeSteram, ok := <-chanStream:
                    if ok == false {
                        return
                    }
                    stream = maybeSteram
                }
                for c := range orDone(done, stream) {
                    select {
                    case outStream <- c:
                    case <-done:  // Why we are selection from the done channel here?
                    }
                }
            }
        }()
        return outStream
    }

orDone函数:

    orDone := func(done <-chan interface{}, inStream <-chan interface{}) <-chan interface{} {
        outStream := make(chan interface{})
        go func() {
            defer close(outStream)
            for {
                select {
                case <-done:
                    return
                case v, ok := <-inStream:
                    if ok == false {
                        return
                    }
                    select {
                    case outStream <- v:
                    case <-done:  // Again why we are only reading from this channel? Shouldn't we return from here?
                        // Why we are not retuening from here?
                    }
                }
            }
        }()
        return outStream
    }

如评论中所述,我需要一些帮助来理解我们为什么要在 for c := range orDone(donem, stream) 中进行选择。谁能解释一下这是怎么回事?

提前致谢。

编辑

我从书上拿了代码concurrency in go。完整的代码可以在这里找到:https://github.com/kat-co/concurrency-in-go-src/blob/master/concurrency-patterns-in-go/the-bridge-channel/fig-bridge-channel.go

在这两种情况下,select 都是为了避免阻塞——如果 reader 没有从我们的输出通道读取,写入可能会阻塞(甚至可能永远阻塞),但我们希望goroutine 在 done 通道关闭时终止,无需等待其他任何事情。通过使用 select,它将等到 either 事情发生,然后继续,而不是在检查 done.[= 之前​​无限期地等待写入完成。 19=]

至于另一个问题,"why are we not returning here?":嗯,我们可以。但是我们 不必 ,因为关闭的通道一旦关闭就永远保持可读性(产生无限数量的零值)。所以在那些 "bottom" selects 中什么都不做是可以的;如果 done 实际上已关闭,我们将返回到循环的顶部并点击 case <-done: return 。我想这是一个风格问题。我可能会自己编写 return,但此示例的作者可能希望避免在两个地方处理相同的条件。只要它只是 return 并不重要,但如果您想对 done 执行一些 额外的 操作,则必须更新该行为如果是底部 select returns,则在两个地方,但如果不是底部,则只在一个地方。

拳头我来解释一下done通道的用法。这是 Go 中许多与并发相关的包实现中遵循的常见模式。完成通道的目的是表示计算结束或更像是一个停止信号。通常 done 通道会被许多 go-routines 或代码流中的多个地方监听。一个这样的例子是 Go's builtin "context" package 中的 Done 频道。由于 Go 没有广播之类的东西,即向频道的所有听众发出信号(期望此功能也不是一个好主意),人们只需关闭频道,所有听众将收到 nil 值。在您的情况下,由于第二个 select 语句位于 for 块的末尾,因此代码所有者可能决定继续循环,以便在下一次迭代中,第一个 select 语句侦听完成channel 将从函数中 return。