Swift 3 GCD 锁变量和block_and_release 错误

Swift 3 GCD lock variable and block_and_release error

我正在使用 Swift 3 GCD 以便在我的代码中执行一些操作。但是我经常收到 _dispatch_call_block_and_release 错误。我想这个错误背后的原因是因为不同的线程修改同一个变量,但我不确定如何解决问题。这是我的代码和解释:

我有一个在不同线程中访问和修改的变量:

var queueMsgSent: Dictionary<Date,BTCommand>? = nil


func lock(obj: AnyObject, blk:() -> ()) {
    objc_sync_enter(obj)
    blk()
    objc_sync_exit(obj)
}

func addMsgSentToQueue(msg: BTCommands) {

    if queueMsgSent == nil {
        queueMsgSent = Dictionary.init()
    }
    let currentDate = Date()
    lock(obj: queueMsgSent as AnyObject) {
        queueMsgSent?.updateValue(msg, forKey: currentDate)
    }
}

func deleteMsgSentWithId(id: Int) {

    if queueMsgSent == nil { return }

    for (date, msg) in queueMsgSent! {


        if msg.isAck() == false && msg.getId()! == id {
            lock(obj: queueMsgSent as AnyObject) {
                queueMsgSent?.removeValue(forKey: date)
            }
        }
   }
}

func runSent() -> Void {


    while(true) {
        if queueMsgSent == nil { continue }

        for (date, msg) in queueMsgSent! {

            if msg.isSent() == false {
                 mainSearchView?.btCom?.write(str: msg.getCommand()!)
                 msg.setSent(val: true)
                lastMsgSent = Date()
                continue
            }

            if msg.isAck() == true {
                lock(obj: queueMsgSent as AnyObject) {
                    queueMsgSent?.removeValue(forKey: date)
                }
                continue
            }



        }
   }

}

我启动 runSent 方法为:

  DispatchQueue.global().async(execute: runSent)

我需要 runSent 不断检查 queueMsgSent 中的一些条件,并且在必要的主线程 id 中调用其他函数 addMsgSentToQueueue 和 deleteMsgSentWithId。我正在使用一些锁定机制,但它无法正常工作

如果您对 objc 机制不满意,可以看看 here。使用它,您可以为要协调的特定同步创建一个 PThreadMutex,然后使用 mutex.fastsync{ *your code* } 来隔离访问。这是一个使用 OS 级调用的简单、非常轻量级的机制,但您必须注意创建死锁。

您提供的示例取决于对象始终是相同的物理实体,因为对象锁使用地址作为同步内容的 ID。因为您似乎必须到处检查 queueMsgSent 的存在,所以我想知道更新值例程在做什么 - 如果它曾经删除字典,并期望稍后创建它,那么您将有一个潜在的竞争作为不同的线程可以看看不同的同步器。

另外,您在 runSent 中的循环是一个自旋循环 - 如果无事可做,它只会燃烧 CPU 而不是等待工作。也许你可以考虑修改它以使用信号量或一些更合适的机制,让工作人员在无事可做时阻塞?

我强烈建议您使用 Grand Central Dispatch 提供的 DispatchQueue(s),它们使多线程管理更容易。

命令

让我们从您的命令开始 class

class Command {
    let id: String
    var isAck = false
    var isSent = false

    init(id:String) {
        self.id = id
    }
}

队列

现在我们可以构建我们的 Queue class,它将提供以下功能

This is our class should not be confused with the concept of DispatchQueue!

  1. Command推入队列
  2. 从队列中删除 Command
  3. 开始处理所有元素入队列

现在代码:

class Queue {
    typealias Element = (date:Date, command:Command)
    private var storage: [Element] = []
    private let serialQueue = DispatchQueue(label: "serialQueue")

    func push(command:Command) {
        serialQueue.async {
            let newElement = (Date(), command)
            self.storage.append(newElement)
        }
    }

    func delete(by id: String) {
        serialQueue.async {
            guard let index = self.storage.index(where: { [=11=].command.id == id }) else { return }
            self.storage.remove(at: index)
        }
    }

    func startProcessing() {
        Timer.scheduledTimer(withTimeInterval: 10, repeats: true) { timer in
            self.processElements()
        }
    }

    private func processElements() {
        serialQueue.async {
            // send messages where isSent == false
            let shouldBeSent = self.storage.filter { ![=11=].command.isSent }
            for elm in shouldBeSent {
                // TODO: add here code to send message
                elm.command.isSent = true
            }

            // remove from storage message where isAck == true
            self.storage = self.storage.filter { ![=11=].command.isAck }
        }
    }
}

它是如何工作的?

如您所见,storage 属性 是一个包含元组列表的数组,每个元组有 2 个组件:DateCommand

由于 storage 数组由多个线程访问,我们需要确保以线程安全的方式访问它。

所以每次我们访问 storage 时,我们都会将我们的代码包装到这个

serialQueue.async {
    // access self.storage safely
}

我们写入上面所示闭包的每个代码都会添加到我们的串行调度队列

串行队列当时确实处理了 1 个关闭。这就是我们的存储 属性 以线程安全方式访问的原因!

最终考虑

下面的代码块是邪恶的

while true {
    ...
}

它确实使用了所有可用的 CPU 时间,它确实冻结了 UI(当在主线程上执行时)并放电电池。

如您所见,我将其替换为

Timer.scheduledTimer(withTimeInterval: 10, repeats: true) { timer in
    self.processElements()
}

每 10 秒调用一次 self.processElements(),为 CPU 留出足够的时间来处理其他线程。

当然,这取决于您更改秒数以更好地适应您的场景。