使用队列和信号量进行并发和 属性 包装器?

Use queue and semaphore for concurrency and property wrapper?

我正在尝试创建一个线程安全的 属性 包装器。我只能认为 GCD 队列和信号量是最快捷、最可靠的方式。信号量只是性能更高(如果这是真的),还是有其他理由使用一个而不是另一个来实现并发?

下面是原子 属性 包装器的两个变体:

@propertyWrapper
struct Atomic<Value> {
    private var value: Value
    private let queue = DispatchQueue(label: "Atomic serial queue")

    var wrappedValue: Value {
        get { queue.sync { value } }
        set { queue.sync { value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

@propertyWrapper
struct Atomic2<Value> {
    private var value: Value
    private var semaphore = DispatchSemaphore(value: 1)

    var wrappedValue: Value {
        get {
            semaphore.wait()
            let temp = value
            semaphore.signal()
            return temp
        }

        set {
            semaphore.wait()
            value = newValue
            semaphore.signal()
        }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

struct MyStruct {
    @Atomic var counter = 0
    @Atomic2 var counter2 = 0
}

func test() {
    var myStruct = MyStruct()

    DispatchQueue.concurrentPerform(iterations: 1000) {
        myStruct.counter += [=11=]
        myStruct.counter2 += [=11=]
   }
}

如何正确测试和测量它们以了解两种实现之间的差异以及它们是否有效?

FWIW,另一种选择是 reader-writer pattern with concurrent queue,其中读取同步完成,但允许 运行 与其他读取并发,但写入异步完成,但有障碍(即不同时进行任何其他读取或写入):

@propertyWrapper
class Atomic<Value> {
    private var value: Value
    private let queue = DispatchQueue(label: "com.domain.app.atomic", attributes: .concurrent)

    var wrappedValue: Value {
        get { queue.sync { value } }
        set { queue.async(flags: .barrier) { self.value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

还有一个是NSLock:

@propertyWrapper
struct Atomic<Value> {
    private var value: Value
    private var lock = NSLock()

    var wrappedValue: Value {
        get { lock.synchronized { value } }
        set { lock.synchronized { value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

哪里

extension NSLocking {
    func synchronized<T>(block: () throws -> T) rethrows -> T {
        lock()
        defer { unlock() }
        return try block()
    }
}

或者你可以使用非公平锁:

@propertyWrapper
struct SynchronizedUnfairLock<Value> {
    private var value: Value
    private var lock = UnfairLock()

    var wrappedValue: Value {
        get { lock.synchronized { value } }
        set { lock.synchronized { value = newValue } }
    }

    init(wrappedValue value: Value) {
        self.value = value
    }
}

在哪里

// One should not use `os_unfair_lock` directly in Swift (because Swift
// can move `struct` types), so we'll wrap it in a `UnsafeMutablePointer`.
// See https://github.com/apple/swift/blob/88b093e9d77d6201935a2c2fb13f27d961836777/stdlib/public/Darwin/Foundation/Publishers%2BLocking.swift#L18
// for stdlib example of this pattern.

final class UnfairLock: NSLocking {
    private let unfairLock: UnsafeMutablePointer<os_unfair_lock> = {
        let pointer = UnsafeMutablePointer<os_unfair_lock>.allocate(capacity: 1)
        pointer.initialize(to: os_unfair_lock())
        return pointer
    }()

    deinit {
        unfairLock.deinitialize(count: 1)
        unfairLock.deallocate()
    }

    func lock() {
        os_unfair_lock_lock(unfairLock)
    }

    func tryLock() -> Bool {
        os_unfair_lock_trylock(unfairLock)
    }

    func unlock() {
        os_unfair_lock_unlock(unfairLock)
    }
}

我们应该认识到,虽然这些以及您的提供了原子性,但您必须小心,因为根据您的使用方式,它可能不是线程安全的。

考虑这个简单的实验,我们将一个整数递增一百万次:

func threadSafetyExperiment() {
    @Atomic var foo = 0

    DispatchQueue.global().async {
        DispatchQueue.concurrentPerform(iterations: 10_000_000) { _ in
            foo += 1
        }
        print(foo)
    }
}

您希望 foo 等于 10,000,000,但事实并非如此。这是因为“检索值并递增并保存”的整个交互需要包装在一个同步机制中。

但是你可以添加一个原子增量方法:

extension Atomic where Value: Numeric {
    mutating func increment(by increment: Value) {
        lock.synchronized { value += increment }
    }
}

然后这个工作正常:

func threadSafetyExperiment() {
    @Atomic var foo = 0

    DispatchQueue.global().async {
        DispatchQueue.concurrentPerform(iterations: iterations) { _ in
            _foo.increment(by: 1)
        }
        print(foo)
    }
}

How can they be properly tested and measured to see the difference between the two implementations and if they even work?

一些想法:

  • 我建议进行 1,000 次以上的迭代。您希望进行足够多的迭代,以便以秒而不是毫秒来衡量结果。我在示例中使用了千万次迭代。

  • 单元测试框架非常适合使用 measure 方法测试正确性以及测量性能(每个单元测试重复性能测试 10 次,结果将被单元测试报告捕获):

    因此,创建一个带有单元测试目标的项目(或者如果需要,将单元测试目标添加到现有项目),然后创建单元测试,并使用 command 执行它们+u.

  • 如果您为目标编辑方案,您可以选择随机化测试顺序,以确保它们执行的顺序不会影响性能:

    我也会让测试目标使用发布版本,以确保您测试的是优化版本。

  • 不用说,虽然我通过 运行ning 10m 次迭代对锁进行压力测试,每次迭代递增 1,但效率非常低。每个线程上根本没有足够的工作来证明线程处理的开销是合理的。人们通常会跨越数据集并在每个线程中进行更多迭代,并减少同步次数。

    这的实际含义是,在设计良好的并行算法中,您做了足够的工作来证明多线程的合理性,您正在减少正在发生的同步数量。因此,不同同步技术中的微小差异是不可观察的。如果同步机制有明显的性能差异,这可能表明并行化算法存在更深层次的问题。专注于减少同步,而不是使同步更快。