如何使用 Combine + Swift 复制 PromiseKit 风格的链式异步流

How to replicate PromiseKit-style chained async flow using Combine + Swift

我在项目中成功使用 PromiseKit,直到 Xcode 11 个测试版破坏了 PK v7。为了减少外部依赖,我决定废弃 PromiseKit。处理链式异步代码的最佳替代品似乎是使用新的 Combine 框架的 Futures。

我正在努力使用 Combine

复制简单的 PK 语法

例如。简单的 PromiseKit 链式异步调用语法

getAccessCodeFromSyncProvider.then{accessCode in startSync(accessCode)}.then{popToRootViewController}.catch{handleError(error)}

I understand:

A Swift standard library implementation of async/await would solve this problem (async/await does not yet exist, despite lots of chatter and involvement from Chris Latter himself)

I could replicate using Semaphores (error-prone?)

flatMap can be used to chain Futures

我想要的异步代码应该能够按需调用,因为它涉及确保用户登录。我正在努力解决两个概念性问题。

  1. 如果我将 Futures 包装在一个方法中,使用 sink 来处理结果,似乎在 sink 调用订阅者之前该方法超出了范围。

  2. 由于 Futures 只执行一次,我担心如果我多次调用该方法,我只会得到第一次调用的旧的、陈旧的结果。要解决这个问题,也许我会使用 PassthroughSubject?这允许按需调用 Publisher。

问题:

  1. 我是否必须保留 调用方法
  2. 如何使用 Swift 标准库复制简单的链式异步,然后将其嵌入到 swift 实例方法中,我可以按需调用以从顶部重新启动链式异步调用? ?
//how is this done using Combine?
func startSync() {
 getAccessCodeFromSyncProvider.then{accessCode in startSync(accessCode)}.catch{\handle error here}
}

这不是您整个问题的真正答案 — 只是关于如何开始使用 Combine 的部分。我将演示如何使用 Combine 框架链接两个异步操作:

    print("start")
    Future<Bool,Error> { promise in
        delay(3) {
            promise(.success(true))
        }
    }
    .handleEvents(receiveOutput: {_ in print("finished 1")})
    .flatMap {_ in
        Future<Bool,Error> { promise in
            delay(3) {
                promise(.success(true))
            }
        }
    }
    .handleEvents(receiveOutput: {_ in print("finished 2")})
    .sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
        .store(in:&self.storage) // storage is a persistent Set<AnyCancellable>

首先,关于持久化问题的答案是:最终订阅者必须持久化,实现方式是使用.store方法。通常你会有一个 Set<AnyCancellable> 作为一个 属性,就像这里一样,你只需调用 .store 作为管道中的最后一件事,将你的订阅者放在那里。

接下来,在这个管道中,我使用 .handleEvents 只是为了在管道移动时给自己一些打印输出。这些只是诊断,不会存在于实际实施中。所有 print 的陈述纯粹是为了让我们可以讨论这里发生的事情。

那么会发生什么?

start
finished 1 // 3 seconds later
finished 2 // 3 seconds later
done

所以你可以看到我们链接了两个异步操作,每个都需要 3 秒。

我们是怎么做到的?我们从 Future 开始,它必须在完成时调用其传入的 promise 方法,并将 Result 作为完成处理程序。之后,我们用.flatMap产生了另一个 Future并投入运行,再做同样的事情。

所以结果并不漂亮(像 PromiseKit)但是它是一个异步操作链。

在 Combine 之前,我们可能已经使用某种 Operation / OperationQueue 依赖项来完成此操作,这可以正常工作,但 PromiseKit 的直接易读性会更少。

更真实一些

说了这么多,这里有一个更现实的重写:

var storage = Set<AnyCancellable>()
func async1(_ promise:@escaping (Result<Bool,Error>) -> Void) {
    delay(3) {
        print("async1")
        promise(.success(true))
    }
}
func async2(_ promise:@escaping (Result<Bool,Error>) -> Void) {
    delay(3) {
        print("async2")
        promise(.success(true))
    }
}
override func viewDidLoad() {
    print("start")
    Future<Bool,Error> { promise in
        self.async1(promise)
    }
    .flatMap {_ in
        Future<Bool,Error> { promise in
            self.async2(promise)
        }
    }
    .sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
        .store(in:&self.storage) // storage is a persistent Set<AnyCancellable>
}

如您所见,我们的未来发布者的想法只需要传递 promise 回调;他们实际上不必是给他们打电话的人。因此可以在任何地方调用 promise 回调,到那时我们才会继续。

因此,您可以很容易地看到如何用真正的异步操作替换人为的 delay,该操作以某种方式控制了此 promise 回调并可以在它完成时调用它。此外,我的承诺结果类型纯粹是人为的,但您可以再次看到它们如何用于在管道中传达有意义的信息。当我说 promise(.success(true)) 时,会导致 true 从管道末端弹出;我们在这里忽略它,但它可能是某种彻头彻尾的有用价值,甚至可能是下一个 Future。

(另请注意,我们可以在链中的任何点插入 .receive(on: DispatchQueue.main) 以确保紧随其后的内容在主线程上启动。)

稍微整洁一些

我还想到,通过将我们的 Future 发布者移至常量,我们可以使语法更简洁,也许更接近 PromiseKit 可爱的简单链。但是,如果您这样做,您可能应该将它们包装在 Deferred 发布者中以防止过早评估。例如:

var storage = Set<AnyCancellable>()
func async1(_ promise:@escaping (Result<Bool,Error>) -> Void) {
    delay(3) {
        print("async1")
        promise(.success(true))
    }
}
func async2(_ promise:@escaping (Result<Bool,Error>) -> Void) {
    delay(3) {
        print("async2")
        promise(.success(true))
    }
}
override func viewDidLoad() {
    print("start")
    let f1 = Deferred{Future<Bool,Error> { promise in
        self.async1(promise)
    }}
    let f2 = Deferred{Future<Bool,Error> { promise in
        self.async2(promise)
    }}
    // this is now extremely neat-looking
    f1.flatMap {_ in f2 }
    .receive(on: DispatchQueue.main)
    .sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
        .store(in:&self.storage) // storage is a persistent Set<AnyCancellable>
}

您可以将此框架用于 Swift 协程,它也可以与 Combine 一起使用 - https://github.com/belozierov/SwiftCoroutine

DispatchQueue.main.startCoroutine {
    let future: Future<Bool, Error>
    let coFuture = future.subscribeCoFuture()
    let bool = try coFuture.await()

}

是正确的,使用 flatMap 来链接承诺。我在使用 PromiseKit 时养成了 returning promises 的习惯,并将其带到了 Combine(returning Futures)。

我发现它使代码更易于阅读。这是 matt 提出的最后一个例子:

var storage = Set<AnyCancellable>()

func async1() -> Future<Bool, Error> {
  Future { promise in
    delay(3) {
      print("async1")
      promise(.success(true))
    }
  }
}

func async2() -> Future<Bool, Error> {
  Future { promise in
    delay(3) {
      print("async2")
      promise(.success(true))
    }
  }
}

override func viewDidLoad() {
  print("start")

  async1()
    .flatMap { _ in async2() }
    .receive(on: DispatchQueue.main)
    .sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
    .store(in:&self.storage) // storage is a persistent Set<AnyCancellable>
}

请注意,AnyPublisher 也将用作 return 值,因此您可以抽象出 Future 并使用 return AnyPublisher<Bool, Error> :

func async2() -> AnyPublisher<Bool, Error> {
  Future { promise in
    delay(3) {
      print("async2")
      promise(.success(true))
    }
  }.eraseToAnyPubilsher()
}

此外,如果您想使用类似 PromiseKit 的语法,这里有一些 Publisher 的扩展

我正在使用它在项目中从 PromiseKit 无缝切换到 Combine

extension Publisher {
    
    func then<T: Publisher>(_ closure: @escaping (Output) -> T) -> Publishers.FlatMap<T, Self>
    where T.Failure == Self.Failure {
        flatMap(closure)
    }
    
    func asVoid() -> Future<Void, Error> {
        return Future<Void, Error> { promise in
            let box = Box()
            let cancellable = self.sink { completion in
                if case .failure(let error) = completion {
                    promise(.failure(error))
                } else if case .finished = completion {
                    box.cancellable = nil
                }
            } receiveValue: { value in
                promise(.success(()))
            }
            box.cancellable = cancellable
        }
    }
    
    @discardableResult
    func done(_ handler: @escaping (Output) -> Void) -> Self {
        let box = Box()
        let cancellable = self.sink(receiveCompletion: {compl in
            if case .finished = compl {
                box.cancellable = nil
            }
        }, receiveValue: {
            handler([=10=])
        })
        box.cancellable = cancellable
        return self
    }
    
    @discardableResult
    func `catch`(_ handler: @escaping (Failure) -> Void) -> Self {
        let box = Box()
        let cancellable = self.sink(receiveCompletion: { compl in
            if case .failure(let failure) = compl {
                handler(failure)
            } else if case .finished = compl {
                box.cancellable = nil
            }
        }, receiveValue: { _ in })
        box.cancellable = cancellable
        return self
    }
    
    @discardableResult
    func finally(_ handler: @escaping () -> Void) -> Self {
        let box = Box()
        let cancellable = self.sink(receiveCompletion: { compl in
            if case .finished = compl {
                handler()
                box.cancellable = nil
            }
        }, receiveValue: { _ in })
        box.cancellable = cancellable
        return self
    }
}

fileprivate class Box {
    var cancellable: AnyCancellable?
}

这是一个使用示例:

func someSync() {
    Future<Bool, Error> { promise in
        delay(3) {
            promise(.success(true))
        }
    }
    .then { result in
        Future<String, Error> { promise in
            promise(.success("111"))
        }
    }
    .done { string in
        print(string)
    }
    .catch { err in
        print(err.localizedDescription)
    }
    .finally {
        print("Finished chain")
    }
}