如何使用通道在 go 例程之间传递字节片
How to pass byte slice between go routines using channels
我有一个函数可以从 source
读取数据并将它们发送到 destination
。源和目标可以是任何东西,对于这个例子来说,源是数据库(任何 MySQL
、PostgreSQL
...),目标是 distributed Q
(任何... ActiveMQ
, Kafka
).消息以字节为单位存储。
这是主要功能。想法是它将旋转一个新的 go 例程并等待返回消息以供将来处理。
type Message []byte
func (p *ProcessorService) Continue(dictId int) {
level.Info(p.logger).Log("process", "message", "dictId", dictId)
retrieved := make(chan Message)
go func() {
err := p.src.Read(retrieved, strconv.Itoa(p.dictId))
if err != nil {
level.Error(p.logger).Log("process", "read", "message", "err", err)
}
}()
for r := range retrieved {
go func(message Message) {
level.Info(p.logger).Log("message", message)
if len(message) > 0 {
if err := p.dst.sendToQ(message); err != nil {
level.Error(p.logger).Log("failed", "during", "persist", "err", err)
}
} else {
level.Error(p.logger).Log("failed")
}
}(r)
}
}
这是读取函数本身
func (s *Storage) Read(out chan<- Message, opt ...string) error {
// I just skip some basic database read operations here
// but idea is simple, read data from the table / file row by row and
//
for _, value := range dataFromDB {
message, err := value.row
if err == nil {
out <- message
} else {
errorf("Unable to get data %v", err)
out <- make([]byte, 0)
}
}
})
close(out)
if err != nil {
return err
}
return nil
}
如您所见,通信是通过 out chan<- 消息通道完成的。
我对 Continue 功能的关注,特别是这里
for r := range retrieved {
go func(message Message) {
// basically here message and r are pointing to the same underlying array
}
}
收到数据时var r
是一种切片字节。然后它传递给 go func(message Message)
在 go 中所有按值传递的东西,在这种情况下 var r
将作为副本传递给匿名函数,但是它仍然有一个指向底层切片数据的指针。我很好奇在 p.dst.sendToQ(message);
执行期间是否会出现问题,同时读取函数会向 out channel
发送一些内容,导致切片数据结构被新信息覆盖。我应该在传递给匿名函数之前将字节切片 r
复制到新的字节切片中,这样底层数组就会不同吗?我测试了它,但并不能真正导致这种行为。不确定我是多疑还是不得不担心。
p.dst.sendToQ(message)
中的 message
与从数据库获取数据时的 value.row
相同。所以,只要每个 value.row
有一个不同的底层数组,你就应该很好。因此,我建议您检查源代码并确保它不使用公共字节数组并不断重写它。
我有一个函数可以从 source
读取数据并将它们发送到 destination
。源和目标可以是任何东西,对于这个例子来说,源是数据库(任何 MySQL
、PostgreSQL
...),目标是 distributed Q
(任何... ActiveMQ
, Kafka
).消息以字节为单位存储。
这是主要功能。想法是它将旋转一个新的 go 例程并等待返回消息以供将来处理。
type Message []byte
func (p *ProcessorService) Continue(dictId int) {
level.Info(p.logger).Log("process", "message", "dictId", dictId)
retrieved := make(chan Message)
go func() {
err := p.src.Read(retrieved, strconv.Itoa(p.dictId))
if err != nil {
level.Error(p.logger).Log("process", "read", "message", "err", err)
}
}()
for r := range retrieved {
go func(message Message) {
level.Info(p.logger).Log("message", message)
if len(message) > 0 {
if err := p.dst.sendToQ(message); err != nil {
level.Error(p.logger).Log("failed", "during", "persist", "err", err)
}
} else {
level.Error(p.logger).Log("failed")
}
}(r)
}
}
这是读取函数本身
func (s *Storage) Read(out chan<- Message, opt ...string) error {
// I just skip some basic database read operations here
// but idea is simple, read data from the table / file row by row and
//
for _, value := range dataFromDB {
message, err := value.row
if err == nil {
out <- message
} else {
errorf("Unable to get data %v", err)
out <- make([]byte, 0)
}
}
})
close(out)
if err != nil {
return err
}
return nil
}
如您所见,通信是通过 out chan<- 消息通道完成的。 我对 Continue 功能的关注,特别是这里
for r := range retrieved {
go func(message Message) {
// basically here message and r are pointing to the same underlying array
}
}
收到数据时var r
是一种切片字节。然后它传递给 go func(message Message)
在 go 中所有按值传递的东西,在这种情况下 var r
将作为副本传递给匿名函数,但是它仍然有一个指向底层切片数据的指针。我很好奇在 p.dst.sendToQ(message);
执行期间是否会出现问题,同时读取函数会向 out channel
发送一些内容,导致切片数据结构被新信息覆盖。我应该在传递给匿名函数之前将字节切片 r
复制到新的字节切片中,这样底层数组就会不同吗?我测试了它,但并不能真正导致这种行为。不确定我是多疑还是不得不担心。
p.dst.sendToQ(message)
中的 message
与从数据库获取数据时的 value.row
相同。所以,只要每个 value.row
有一个不同的底层数组,你就应该很好。因此,我建议您检查源代码并确保它不使用公共字节数组并不断重写它。