如何在go中多路复用通道输出
How to multiplex channel output in go
我正在寻找在 go 中多路复用某些通道输出的解决方案。
我有一个数据源,它是从 io.Reader
中读取的,我发送到单个通道。另一方面,我有一个从通道读取的 websocket 请求处理程序。现在碰巧有两个客户端创建了一个 websocket 连接,它们都从同一个通道读取,但每个客户端都只收到了一部分消息。
代码示例(简化):
func (b *Bootloader) ReadLog() (<-chan []byte, error) {
if b.logCh != nil {
logrus.Warn("ReadLog called while channel already exists!")
return b.logCh, nil // This is where we get problems
}
b.logCh = make(chan []byte, 0)
go func() {
buf := make([]byte, 1024)
for {
n, err := b.p.Read(buf)
if err == nil {
msg := make([]byte, n)
copy(msg, buf[:n])
b.logCh <- msg
} else {
break
}
}
close(b.logCh)
b.logCh = nil
}()
return b.logCh, nil
}
现在当 ReadLog()
被调用两次时,第二次调用只是 returns 第一次调用时创建的频道,这导致了上述问题。
问题是:如何正确复用?
关心发送站点或接收站点的多路复用是否better/easier/more ideomatic?
我应该对接收器隐藏频道并使用回调吗?
我现在有点卡住了。欢迎任何提示。
多路复用非常简单:将要多路复用的通道切片,启动一个从原始通道读取的 goroutine,并将每条消息复制到切片中的每个通道:
// Really this should be in Bootloader but this is just an example
var consumers []chan []byte
func (b *Bootloader) multiplex() {
// We'll use a sync.once to make sure we don't start a bunch of these.
sync.Once(func(){
go func() {
// Every time a message comes over the channel...
for v := range b.logCh {
// Loop over the consumers...
for _,cons := range consumers {
// Send each one the message
cons <- v
}
}
}()
})
}
我正在寻找在 go 中多路复用某些通道输出的解决方案。
我有一个数据源,它是从 io.Reader
中读取的,我发送到单个通道。另一方面,我有一个从通道读取的 websocket 请求处理程序。现在碰巧有两个客户端创建了一个 websocket 连接,它们都从同一个通道读取,但每个客户端都只收到了一部分消息。
代码示例(简化):
func (b *Bootloader) ReadLog() (<-chan []byte, error) {
if b.logCh != nil {
logrus.Warn("ReadLog called while channel already exists!")
return b.logCh, nil // This is where we get problems
}
b.logCh = make(chan []byte, 0)
go func() {
buf := make([]byte, 1024)
for {
n, err := b.p.Read(buf)
if err == nil {
msg := make([]byte, n)
copy(msg, buf[:n])
b.logCh <- msg
} else {
break
}
}
close(b.logCh)
b.logCh = nil
}()
return b.logCh, nil
}
现在当 ReadLog()
被调用两次时,第二次调用只是 returns 第一次调用时创建的频道,这导致了上述问题。
问题是:如何正确复用?
关心发送站点或接收站点的多路复用是否better/easier/more ideomatic?
我应该对接收器隐藏频道并使用回调吗?
我现在有点卡住了。欢迎任何提示。
多路复用非常简单:将要多路复用的通道切片,启动一个从原始通道读取的 goroutine,并将每条消息复制到切片中的每个通道:
// Really this should be in Bootloader but this is just an example
var consumers []chan []byte
func (b *Bootloader) multiplex() {
// We'll use a sync.once to make sure we don't start a bunch of these.
sync.Once(func(){
go func() {
// Every time a message comes over the channel...
for v := range b.logCh {
// Loop over the consumers...
for _,cons := range consumers {
// Send each one the message
cons <- v
}
}
}()
})
}