如何在管道中同步 goroutines

How to sync goroutines in pipeline

我需要帮助才能理解为什么以下代码不起作用。我正在构建一个管道并尝试有一个步骤来同步来自两个源通道的值。我的 source/producer 代码如下所示(在我的真实代码中,我从文件中读取文本)。源已排序,但不能保证两个源中的值都存在。

func Source() <-chan int{
    out := make(chan int, 5)

    go func() {
        defer reader.Close()

        out <- 1
        out <- 2
        out <- 3
        out <- 4
        out <- 5
        out <- 7

        close(out)
    }()

    return out
}

同步代码如下所示:

func Sync(a, b <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        av, ak:= <-a
        bv, bk:= <-b

        for ak || bk {

            if !ak || av < bv {
                out <- bv

                bv, bk = <-b
                continue
            }

            if !bk|| bv > av {
                out <- av

                av, ak = <-a
                continue
            }

            out <- av

            av, ak = <-a
            bv, bk = <-b
        }

        close(out)
    }()

    return out
}

我的程序看起来像这样:

func main() {
    os := Source()
    ns := Source()

    for val := range Sync(ns, os) {
        fmt.Printf("[SYNCED] %v \n", val)
    }
}

预期的行为是我的两个源将值缓冲到通道中并且我的同步首先从第一个源读取值。然后从第二。比较它们,如果它们相等,则继续两个通道中的下一个。如果不一样,我们会把后面的值发出来,换成新的值,再做同样的比较。

发生的事情是,对于这些值,同步代码似乎是 运行 几次,我会多次得到 [SYNCED] 1 之类的东西。为什么?

请帮我解决这个问题!

关于 http://play.golang.org/p/uhd3EWrwEo and http://play.golang.org/p/Dqq7-cPaFq -

实际上,int 的代码也会因类似的测试用例而失败:

os := Source([]int{1, 2, 3})
ns := Source([]int{1, 3, 4})

将整数版本放入无限循环。

发生这种情况是因为当检查 !aok || avalue > bvalue 时,它没有考虑如果 aok 为真(某些元素仍在 a 中)和 bok为假(b 中不再有元素),则 avalue > "" 始终为真。因此它会尝试从 b(它是空的)中获取另一个项目并进入无限循环。固定码:http://play.golang.org/p/vYhuOZxRMl