扇入通道到单通道

Fan-in channels to single channel

我有几个通道 c1、c2、c3、c4 ...,如何将这些通道中的所有数据收集到一个通道中? 我的代码:

package main

import (
    "fmt"
    "sync"
)

func putToChannel(c chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i < 6; i++ {
        c <- i
    }
}

func main() {
    c := make(chan int, 15)
    c1 := make(chan int, 5)
    c2 := make(chan int, 5)
    c3 := make(chan int, 5)
    go func(){c <- <-c1}()
    go func(){c <- <-c2}()
    go func(){c <- <-c3}()
    wg := new(sync.WaitGroup)
    wg.Add(3)
    go putToChannel(c1, wg)
    go putToChannel(c2, wg)
    go putToChannel(c3, wg)
    wg.Wait()
    close(c)
    for i := range c {
        fmt.Println("Receive:", i)
    }

    fmt.Println("Finish")
}

我想合成从 c1、c2 到 c 的所有数据,但它不起作用

This article 有一篇关于如何“扇入”频道的好文章,包括短暂停止。

这些行有问题:

go func(){c <- <-c1}()
go func(){c <- <-c2}()
go func(){c <- <-c3}()

其中每一个都将从 cx 通道接收 一个 值,并将该值发送到 c

你需要一个看起来像这样的方法;

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

此方法依赖于以下事实:当没有更多值要发送时,正在传递给 merge 的通道 cs... 将关闭。

这意味着您还需要更新 putToChannel 方法

func putToChannel(c chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    defer close(c)
    for i := 1; i < 6; i++ {
        c <- i
    }
}

最后一件事值得注意的是,总的来说;尝试将创建并发送到通道的函数和关闭通道的函数封装为相同的函数。这意味着您永远不会尝试在封闭的频道上发送。

而不是:

c1 := make(chan int, 5)
go putToChannel(c1, wg)

你可以做到;

func generator() (<-chan int) {
    c := make(chan int, 5)
    go func() {
        for i := 1; i < 6; i++ {
             c <- i
        }
        close(c)
    }() 
    return c
}

您的主要方法类似于:

func main() {
    var cs []<-chan int

    cs = append(cs, generator())
    cs = append(cs, generator())
    cs = append(cs, generator())

    c := merge(cs...)
    for v := range c {
        fmt.Println(v)
    }
}

我已经稍微修改了你的代码,如下所示。

基本上,在此示例中,需要执行三个步骤:

  1. 将值放入 c1c2 和 ,c3 并在完成放入值后不要忘记关闭它们。
  2. 迭代每个需要合并到 cfor-range 的通道,将值放入 c。在 for-range 循环之后,您需要放置 wg.Done() 以便在每个通道的迭代完成后,您可以向 goroutine 发出最终关闭 c 的信号。如果您碰巧没有关闭要合并到 c 的频道之一,您将收到 all goroutines are asleep - deadlock 错误。
  3. 完成后关闭 c 频道

修改后的代码如下:

package main

import (
    "fmt"
    "sync"
)

func putToChannel(c chan<- int) {
    for i := 1; i < 6; i++ {
        c <- i
    }
    //close the channel after putting values in
    close(c)
}

func main() {
    c := make(chan int, 15)
    c1 := make(chan int, 5)
    c2 := make(chan int, 5)
    c3 := make(chan int, 5)

    output := func(ch <-chan int, wg *sync.WaitGroup) {
        //you need to iterate over the channel
        for n := range ch {
            c <- n
        }
        wg.Done()
    }

    wg := new(sync.WaitGroup)
    wg.Add(3)
    go putToChannel(c1)
    go putToChannel(c2)
    go putToChannel(c3)
    go output(c1, wg)
    go output(c2, wg)
    go output(c3, wg)

    go func() {
        wg.Wait()
        close(c)
    }()
    for i := range c {
        fmt.Println("Receive:", i)
    }

    fmt.Println("Finish")
}

您可以找到更多信息 here

你可以这样做

package main

import (
    "fmt"
    "sync"
)

func putToChannel(c chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i < 6; i++ {
        c <- i
    }
}

func main() {
    c := make(chan int, 15)
    c1 := make(chan int, 5)
    c2 := make(chan int, 5)
    c3 := make(chan int, 5)
    send := func(c1 chan int, c2 chan int) {
        for {
            value := <-c2
            c1 <- value
        }
    }
    go send(c, c1)
    go send(c, c2)
    go send(c, c3)
    wg := new(sync.WaitGroup)
    wg.Add(3)
    go putToChannel(c1, wg)
    go putToChannel(c2, wg)
    go putToChannel(c3, wg)
    wg.Wait()
    for i := 0; i < 15; i++ {
        fmt.Println("Receive:", <-c)
    }

    fmt.Println("Finish")
}

输出:

Receive: 1
Receive: 2
Receive: 3
Receive: 4
Receive: 5
Receive: 1
Receive: 2
Receive: 3
Receive: 4
Receive: 5
Receive: 1
Receive: 2
Receive: 3
Receive: 4
Receive: 5
Finish

一个清理方法:

假设我们有一个这样的无限发送者:

func msgSender(msg string) <-chan string{
    ch := make(chan string)
    go func() {
        for {
            ch <- msg
            time.Sleep(300*time.Millisecond)
        }
    }()
    return ch
}

注意: msgSender() return 仅接收通道

我们希望在一个频道中加入两个(或更多)发件人:

func fanIn(){
    receiveOnlyCh1:= msgSender("msg1")
    receiveOnlyCh2:= msgSender("msg2")

    fanInCh := make(chan string)
    go func(){
        for{
            select {
            case fromSender1 := <-receiveOnlyCh1:
                fanInCh <- fromSender1
            case fromSender2 := <-receiveOnlyCh2:
                fanInCh <- fromSender2
            }
        }
    }()

    go func(){
        for {
            fmt.Println(<-fanInCh)
        }
    }()
}

输出:

msg1
msg2
msg2
msg1
msg2
msg1
msg2
msg1

https://play.golang.org/p/BhScg6nqphq

涉及 reflect 包的解决方案,以提供可重用的功能。

package main

import (
    "fmt"
    "reflect"
    "sync"
)

func putToChannel(c chan<- int) {
    for i := 1; i < 6; i++ {
        c <- i
    }
    close(c)
}

func main() {

    var cs []interface{}
    for i := 0; i < 3; i++ {
        c := make(chan int, 5)
        cs = append(cs, c)
        go putToChannel(c)
    }

    c := fanin(cs...).(chan int)
    for i := range c {
        fmt.Println("Receive:", i)
    }
    fmt.Println("Finish")

}

https://play.golang.org/p/b_vbLzpCDFo

它看起来稍微简化了,因为它不需要明显使用 WaitGroup。当工作完成时,每个工作人员都对 close 输出通道负有简单的责任。 fanin 会根据先前的先决条件解决问题。

fanin 函数需要一个相同通道类型的空接口切片,它会查找第一项以确定输出通道的类型。然后它创建输出通道,为每个输入通道生成一个 goroutine,并将每个输入通道的项目转发到输出通道。 当所有输入通道也关闭时,涉及 WaitGroup 以关闭输出通道。 请注意,如果输入通道切片为空,它会出现恐慌。

看起来像这样

// fanin fanins all input channels into a single channel.
// fanin is func(input ...chan T) (chan T)
func fanin(inputs ...interface{}) interface{} {
    // note: because we take in variadic of interface, we cannot receive the trailing error channel...
    if len(inputs) < 1 {
        panic("no channels to fanin")
    }
    rinputs := []reflect.Value{}
    for _, input := range inputs {
        rinputs = append(rinputs, reflect.ValueOf(input))
    }
    out := reflect.MakeChan(reflect.ChanOf(reflect.BothDir, rinputs[0].Type().Elem()), 0)
    var wg sync.WaitGroup
    for i := 0; i < len(rinputs); i++ {
        rinput := rinputs[i]
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                x, ok := rinput.Recv()
                if !ok {
                    break
                }
                out.Send(x)
            }
        }()
    }
    go func() {
        wg.Wait()
        out.Close()
    }()
    return out.Convert(rinputs[0].Type()).Interface()
}

如果你一直把事情弄得一团糟,你可能会得到如下所示的 fanout 函数。

https://play.golang.org/p/0Zi9h4XPbbV

package main

import (
    "fmt"
    "reflect"
    "sync"
)

func putToChannel() chan int {
    c := make(chan int, 5)
    go func() {
        for i := 1; i < 6; i++ {
            c <- i
        }
        close(c)
    }()
    return c
}

func main() {
    for i := range fanout(3, putToChannel).(chan int) {
        fmt.Println("Receive:", i)
    }

    fmt.Println("Finish")
}

通过这样的实现,fanout 函数看起来像

// fanout spawns n instances of workers.
// worker is a func() chan A
// it fanins all output channels and return a chan A
// fanout is func(n int, worker func() chan A) chan A
func fanout(n int, worker interface{}) interface{} {
    rworker := reflect.ValueOf(worker)
    if rworker.Kind() != reflect.Func {
        panic("not a func")
    }
    var outchans []interface{}
    var in []reflect.Value
    for i := 0; i < n; i++ {
        res := rworker.Call(in)
        outchans = append(outchans, res[0].Interface())
    }
    return fanin(outchans...)
}