按顺序从 golang 通道读取
Reading from golang channels in order
我正在尝试通过 go 中的通道实现并行处理和通信。
我基本上尝试解决的是并行处理特定数据,并按顺序获得结果 => 为此目的引入类型 Chunk
(见下文)。
我只是为每个块处理创建新通道并将它们保存在切片中 => 希望在我之后迭代它们时被订购。
我程序的简化版本是 (https://play.golang.org/p/RVtDGgUVCV):
package main
import (
"fmt"
)
type Chunk struct {
from int
to int
}
func main() {
chunks := []Chunk{
Chunk{
from: 0,
to: 2,
},
Chunk{
from: 2,
to: 4,
},
}
outChannels := [](<-chan struct {
string
error
}){}
for _, chunk := range chunks {
outChannels = append(outChannels, processChunk(&chunk))
}
for _, outChannel := range outChannels {
for out := range outChannel {
if out.error != nil {
fmt.Printf("[ERROR] %s", out.error)
return
}
fmt.Printf("[STDOUT] %s", out.string)
}
}
}
func processChunk(c *Chunk) <-chan struct {
string
error
} {
outChannel := make(chan struct {
string
error
})
go func() {
outChannel <- struct {
string
error
}{fmt.Sprintf("from: %d to: %d\n", c.from, c.to), nil}
close(outChannel)
}()
return outChannel
}
我看到的输出是:
[STDOUT] from: 2 to: 4
[STDOUT] from: 2 to: 4
然而我希望看到的是:
[STDOUT] from: 0 to: 2
[STDOUT] from: 2 to: 4
我在这里做错了什么?我没看到。
问题出在 main
的第一个 for
循环中。当您使用 for range
循环时,循环变量(此处为 chunk
)会创建一次,并在每次迭代时为每个切片元素分配一个副本。
当你调用processChunk(&chunk)
时,你传递的是这个循环变量的地址,这个变量的值随着每次迭代而改变。因此,函数 processChunk
总是在 chunks
循环中的最后一项上工作,因为这是 for 循环完成后 *chunk
指向的内容。
要修复,请使用切片索引:
for i := 0; i < len(chunks); i++ {
// pass chunk objects by indexing chunks
outChannels = append(outChannels, processChunk(&chunks[i]))
}
固定码:https://play.golang.org/p/A1_DtkncY_
您可以阅读更多关于 range
here。
我正在尝试通过 go 中的通道实现并行处理和通信。
我基本上尝试解决的是并行处理特定数据,并按顺序获得结果 => 为此目的引入类型 Chunk
(见下文)。
我只是为每个块处理创建新通道并将它们保存在切片中 => 希望在我之后迭代它们时被订购。
我程序的简化版本是 (https://play.golang.org/p/RVtDGgUVCV):
package main
import (
"fmt"
)
type Chunk struct {
from int
to int
}
func main() {
chunks := []Chunk{
Chunk{
from: 0,
to: 2,
},
Chunk{
from: 2,
to: 4,
},
}
outChannels := [](<-chan struct {
string
error
}){}
for _, chunk := range chunks {
outChannels = append(outChannels, processChunk(&chunk))
}
for _, outChannel := range outChannels {
for out := range outChannel {
if out.error != nil {
fmt.Printf("[ERROR] %s", out.error)
return
}
fmt.Printf("[STDOUT] %s", out.string)
}
}
}
func processChunk(c *Chunk) <-chan struct {
string
error
} {
outChannel := make(chan struct {
string
error
})
go func() {
outChannel <- struct {
string
error
}{fmt.Sprintf("from: %d to: %d\n", c.from, c.to), nil}
close(outChannel)
}()
return outChannel
}
我看到的输出是:
[STDOUT] from: 2 to: 4
[STDOUT] from: 2 to: 4
然而我希望看到的是:
[STDOUT] from: 0 to: 2
[STDOUT] from: 2 to: 4
我在这里做错了什么?我没看到。
问题出在 main
的第一个 for
循环中。当您使用 for range
循环时,循环变量(此处为 chunk
)会创建一次,并在每次迭代时为每个切片元素分配一个副本。
当你调用processChunk(&chunk)
时,你传递的是这个循环变量的地址,这个变量的值随着每次迭代而改变。因此,函数 processChunk
总是在 chunks
循环中的最后一项上工作,因为这是 for 循环完成后 *chunk
指向的内容。
要修复,请使用切片索引:
for i := 0; i < len(chunks); i++ {
// pass chunk objects by indexing chunks
outChannels = append(outChannels, processChunk(&chunks[i]))
}
固定码:https://play.golang.org/p/A1_DtkncY_
您可以阅读更多关于 range
here。