缓冲通道的死锁

Deadlock with buffered channel

我有一些代码是作业调度程序,正在整理来自大量 TCP 套接字的大量数据。此代码是 方法的结果,它在很大程度上适用于 CPU 大量使用并且锁定现在也不是问题。

我的应用程序有时会锁定,并且 "Channel length" 日志是唯一不断重复的东西,因为数据仍然来自我的套接字。但是,计数仍为 5000,并且没有进行下游处理。

认为 问题可能是竞争条件,它可能挂断的线路是 select 中的 channel <- msg jobDispatcher。问题是我不知道如何验证这一点。

我怀疑由于 select 可以随机获取项目,goroutine 正在返回并且 shutdownChan 没有机会处理。然后数据到达 inboundFromTCP 并阻塞!

有人可能会在这里发现一些非常明显的错误。并希望提供解决方案!?

var MessageQueue = make(chan *trackingPacket_v1, 5000)

func init() {
    go jobDispatcher(MessageQueue)
}

func addMessage(trackingPacket *trackingPacket_v1) {
    // Send the packet to the buffered queue!
    log.Println("Channel length:", len(MessageQueue))
    MessageQueue <- trackingPacket
}

func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) {
    var channelMap = make(map[string]chan *trackingPacket_v1)

    // Channel that listens for the strings that want to exit
    shutdownChan := make(chan string)

    for {
        select {
        case msg := <-inboundFromTCP:
            log.Println("Got packet", msg.Avr)
            channel, ok := channelMap[msg.Avr]
            if !ok {
                packetChan := make(chan *trackingPacket_v1)

                channelMap[msg.Avr] = packetChan
                go processPackets(packetChan, shutdownChan, msg.Avr)
                packetChan <- msg
                continue
            }
            channel <- msg
        case shutdownString := <-shutdownChan:
            log.Println("Shutting down:", shutdownString)
            channel, ok := channelMap[shutdownString]
            if ok {
                delete(channelMap, shutdownString)
                close(channel)
            }
        }
    }
}

func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id string) {
    var messages = []*trackingPacket_v1{}

    tickChan := time.NewTicker(time.Second * 1)
    defer tickChan.Stop()

    hasCheckedData := false

    for {
        select {
        case msg := <-ch:
            log.Println("Got a messages for", id)
            messages = append(messages, msg)
            hasCheckedData = false
        case <-tickChan.C:

            messages = cullChanMessages(messages)
            if len(messages) == 0 {
                messages = nil
                shutdown <- id
                return
            }

            // No point running checking when packets have not changed!!
            if hasCheckedData == false {
                processMLATCandidatesFromChan(messages)
                hasCheckedData = true
            }
        case <-time.After(time.Duration(time.Second * 60)):
            log.Println("This channel has been around for 60 seconds which is too much, kill it")
            messages = nil
            shutdown <- id
            return
        }
    }
}

2016 年 1 月 20 日更新

我尝试将 channelMap 作为一个具有互斥锁的全局变量进行返工,但它最终还是死锁了。


稍微调整了代码,仍然锁定,但我不明白这个是怎么做到的!! https://play.golang.org/p/PGpISU4XBJ


更新 01/21/17 在提出一些建议后,我将其放入一个独立的工作示例中,以便人们可以看到。 https://play.golang.org/p/88zT7hBLeD

这是一个很长的 运行 过程,因此需要 运行 在本地机器上运行,因为 playground 会杀死它。希望这有助于查明真相!

我是 Go 新手,但在此代码中

case msg := <-inboundFromTCP:
        log.Println("Got packet", msg.Avr)
        channel, ok := channelMap[msg.Avr]
        if !ok {
            packetChan := make(chan *trackingPacket_v1)

            channelMap[msg.Avr] = packetChan
            go processPackets(packetChan, shutdownChan, msg.Avr)
            packetChan <- msg
            continue
        }
        channel <- msg

你不是在频道里放东西吗(无缓冲?)这里

channel, ok := channelMap[msg.Avr]

所以你不需要先清空那个频道才能在这里添加消息吗?

channel <- msg

就像我说的,我是 Go 的新手,所以我希望我没有犯傻。 :)

我猜你的问题是在另一个 goroutine 正在做 shutdown <- id.

的同时做这个 channel <- msg

由于 channelshutdown 通道都没有缓冲,它们会阻塞等待接收者。他们可以死锁等待另一方可用。

有几种方法可以修复它。您可以使用缓冲区 1 声明这两个通道。

或者不是通过发送关闭消息来发出信号,您可以执行 Google 的上下文包所做的并通过关闭关闭通道来发送关闭信号。查看 https://golang.org/pkg/context/,尤其是 WithCancelWithDeadlineDone 函数。

您或许可以使用上下文删除您自己的关闭通道和超时代码。

JimB 有一个关于关闭 goroutine 的观点,而它可能仍在通道上接收。你应该做的是发送关闭消息(或关闭,或取消上下文)并继续处理消息,直到你的 ch 通道关闭(用 case msg, ok := <-ch: 检测),这将在关闭后发生由发件人收到。

通过这种方式,您可以获得在实际关闭之前传入的所有消息,并且应该避免第二次死锁。