缓冲通道的死锁
Deadlock with buffered channel
我有一些代码是作业调度程序,正在整理来自大量 TCP 套接字的大量数据。此代码是 方法的结果,它在很大程度上适用于 CPU 大量使用并且锁定现在也不是问题。
我的应用程序有时会锁定,并且 "Channel length" 日志是唯一不断重复的东西,因为数据仍然来自我的套接字。但是,计数仍为 5000,并且没有进行下游处理。
我 认为 问题可能是竞争条件,它可能挂断的线路是 select
中的 channel <- msg
jobDispatcher
。问题是我不知道如何验证这一点。
我怀疑由于 select 可以随机获取项目,goroutine 正在返回并且 shutdownChan 没有机会处理。然后数据到达 inboundFromTCP 并阻塞!
有人可能会在这里发现一些非常明显的错误。并希望提供解决方案!?
var MessageQueue = make(chan *trackingPacket_v1, 5000)
func init() {
go jobDispatcher(MessageQueue)
}
func addMessage(trackingPacket *trackingPacket_v1) {
// Send the packet to the buffered queue!
log.Println("Channel length:", len(MessageQueue))
MessageQueue <- trackingPacket
}
func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) {
var channelMap = make(map[string]chan *trackingPacket_v1)
// Channel that listens for the strings that want to exit
shutdownChan := make(chan string)
for {
select {
case msg := <-inboundFromTCP:
log.Println("Got packet", msg.Avr)
channel, ok := channelMap[msg.Avr]
if !ok {
packetChan := make(chan *trackingPacket_v1)
channelMap[msg.Avr] = packetChan
go processPackets(packetChan, shutdownChan, msg.Avr)
packetChan <- msg
continue
}
channel <- msg
case shutdownString := <-shutdownChan:
log.Println("Shutting down:", shutdownString)
channel, ok := channelMap[shutdownString]
if ok {
delete(channelMap, shutdownString)
close(channel)
}
}
}
}
func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id string) {
var messages = []*trackingPacket_v1{}
tickChan := time.NewTicker(time.Second * 1)
defer tickChan.Stop()
hasCheckedData := false
for {
select {
case msg := <-ch:
log.Println("Got a messages for", id)
messages = append(messages, msg)
hasCheckedData = false
case <-tickChan.C:
messages = cullChanMessages(messages)
if len(messages) == 0 {
messages = nil
shutdown <- id
return
}
// No point running checking when packets have not changed!!
if hasCheckedData == false {
processMLATCandidatesFromChan(messages)
hasCheckedData = true
}
case <-time.After(time.Duration(time.Second * 60)):
log.Println("This channel has been around for 60 seconds which is too much, kill it")
messages = nil
shutdown <- id
return
}
}
}
2016 年 1 月 20 日更新
我尝试将 channelMap
作为一个具有互斥锁的全局变量进行返工,但它最终还是死锁了。
稍微调整了代码,仍然锁定,但我不明白这个是怎么做到的!!
https://play.golang.org/p/PGpISU4XBJ
更新 01/21/17
在提出一些建议后,我将其放入一个独立的工作示例中,以便人们可以看到。 https://play.golang.org/p/88zT7hBLeD
这是一个很长的 运行 过程,因此需要 运行 在本地机器上运行,因为 playground 会杀死它。希望这有助于查明真相!
我是 Go 新手,但在此代码中
case msg := <-inboundFromTCP:
log.Println("Got packet", msg.Avr)
channel, ok := channelMap[msg.Avr]
if !ok {
packetChan := make(chan *trackingPacket_v1)
channelMap[msg.Avr] = packetChan
go processPackets(packetChan, shutdownChan, msg.Avr)
packetChan <- msg
continue
}
channel <- msg
你不是在频道里放东西吗(无缓冲?)这里
channel, ok := channelMap[msg.Avr]
所以你不需要先清空那个频道才能在这里添加消息吗?
channel <- msg
就像我说的,我是 Go 的新手,所以我希望我没有犯傻。 :)
我猜你的问题是在另一个 goroutine 正在做 shutdown <- id
.
的同时做这个 channel <- msg
由于 channel
和 shutdown
通道都没有缓冲,它们会阻塞等待接收者。他们可以死锁等待另一方可用。
有几种方法可以修复它。您可以使用缓冲区 1 声明这两个通道。
或者不是通过发送关闭消息来发出信号,您可以执行 Google 的上下文包所做的并通过关闭关闭通道来发送关闭信号。查看 https://golang.org/pkg/context/,尤其是 WithCancel
、WithDeadline
和 Done
函数。
您或许可以使用上下文删除您自己的关闭通道和超时代码。
JimB 有一个关于关闭 goroutine 的观点,而它可能仍在通道上接收。你应该做的是发送关闭消息(或关闭,或取消上下文)并继续处理消息,直到你的 ch
通道关闭(用 case msg, ok := <-ch:
检测),这将在关闭后发生由发件人收到。
通过这种方式,您可以获得在实际关闭之前传入的所有消息,并且应该避免第二次死锁。
我有一些代码是作业调度程序,正在整理来自大量 TCP 套接字的大量数据。此代码是
我的应用程序有时会锁定,并且 "Channel length" 日志是唯一不断重复的东西,因为数据仍然来自我的套接字。但是,计数仍为 5000,并且没有进行下游处理。
我 认为 问题可能是竞争条件,它可能挂断的线路是 select
中的 channel <- msg
jobDispatcher
。问题是我不知道如何验证这一点。
我怀疑由于 select 可以随机获取项目,goroutine 正在返回并且 shutdownChan 没有机会处理。然后数据到达 inboundFromTCP 并阻塞!
有人可能会在这里发现一些非常明显的错误。并希望提供解决方案!?
var MessageQueue = make(chan *trackingPacket_v1, 5000)
func init() {
go jobDispatcher(MessageQueue)
}
func addMessage(trackingPacket *trackingPacket_v1) {
// Send the packet to the buffered queue!
log.Println("Channel length:", len(MessageQueue))
MessageQueue <- trackingPacket
}
func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) {
var channelMap = make(map[string]chan *trackingPacket_v1)
// Channel that listens for the strings that want to exit
shutdownChan := make(chan string)
for {
select {
case msg := <-inboundFromTCP:
log.Println("Got packet", msg.Avr)
channel, ok := channelMap[msg.Avr]
if !ok {
packetChan := make(chan *trackingPacket_v1)
channelMap[msg.Avr] = packetChan
go processPackets(packetChan, shutdownChan, msg.Avr)
packetChan <- msg
continue
}
channel <- msg
case shutdownString := <-shutdownChan:
log.Println("Shutting down:", shutdownString)
channel, ok := channelMap[shutdownString]
if ok {
delete(channelMap, shutdownString)
close(channel)
}
}
}
}
func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id string) {
var messages = []*trackingPacket_v1{}
tickChan := time.NewTicker(time.Second * 1)
defer tickChan.Stop()
hasCheckedData := false
for {
select {
case msg := <-ch:
log.Println("Got a messages for", id)
messages = append(messages, msg)
hasCheckedData = false
case <-tickChan.C:
messages = cullChanMessages(messages)
if len(messages) == 0 {
messages = nil
shutdown <- id
return
}
// No point running checking when packets have not changed!!
if hasCheckedData == false {
processMLATCandidatesFromChan(messages)
hasCheckedData = true
}
case <-time.After(time.Duration(time.Second * 60)):
log.Println("This channel has been around for 60 seconds which is too much, kill it")
messages = nil
shutdown <- id
return
}
}
}
2016 年 1 月 20 日更新
我尝试将 channelMap
作为一个具有互斥锁的全局变量进行返工,但它最终还是死锁了。
稍微调整了代码,仍然锁定,但我不明白这个是怎么做到的!! https://play.golang.org/p/PGpISU4XBJ
更新 01/21/17 在提出一些建议后,我将其放入一个独立的工作示例中,以便人们可以看到。 https://play.golang.org/p/88zT7hBLeD
这是一个很长的 运行 过程,因此需要 运行 在本地机器上运行,因为 playground 会杀死它。希望这有助于查明真相!
我是 Go 新手,但在此代码中
case msg := <-inboundFromTCP:
log.Println("Got packet", msg.Avr)
channel, ok := channelMap[msg.Avr]
if !ok {
packetChan := make(chan *trackingPacket_v1)
channelMap[msg.Avr] = packetChan
go processPackets(packetChan, shutdownChan, msg.Avr)
packetChan <- msg
continue
}
channel <- msg
你不是在频道里放东西吗(无缓冲?)这里
channel, ok := channelMap[msg.Avr]
所以你不需要先清空那个频道才能在这里添加消息吗?
channel <- msg
就像我说的,我是 Go 的新手,所以我希望我没有犯傻。 :)
我猜你的问题是在另一个 goroutine 正在做 shutdown <- id
.
channel <- msg
由于 channel
和 shutdown
通道都没有缓冲,它们会阻塞等待接收者。他们可以死锁等待另一方可用。
有几种方法可以修复它。您可以使用缓冲区 1 声明这两个通道。
或者不是通过发送关闭消息来发出信号,您可以执行 Google 的上下文包所做的并通过关闭关闭通道来发送关闭信号。查看 https://golang.org/pkg/context/,尤其是 WithCancel
、WithDeadline
和 Done
函数。
您或许可以使用上下文删除您自己的关闭通道和超时代码。
JimB 有一个关于关闭 goroutine 的观点,而它可能仍在通道上接收。你应该做的是发送关闭消息(或关闭,或取消上下文)并继续处理消息,直到你的 ch
通道关闭(用 case msg, ok := <-ch:
检测),这将在关闭后发生由发件人收到。
通过这种方式,您可以获得在实际关闭之前传入的所有消息,并且应该避免第二次死锁。