组合框架序列化异步操作

Combine framework serialize async operations

如何让构成 Combine 框架的异步管道同步(串行)排列?

假设我有 50 个 URL,我想从中下载相应的资源,假设我想一次下载一个。我知道如何使用 Operation / OperationQueue 做到这一点,例如使用在下载完成之前不声明自己完成的 Operation 子类。我如何使用 Combine 做同样的事情?

目前我所想的就是保留一份剩余 URL 的全局列表并弹出一个,为一次下载设置一个管道,进行下载,然后在 sink管道,重复。这看起来不太像Combine。

我确实尝试制作一组​​ URL 并将其映射到一组发布者。我知道我可以 "produce" 一个发布者并使用 flatMap 使其在管道中发布。但后来我仍然在同时进行所有下载。没有任何 Combine 方法可以以受控方式遍历阵列 — 或者有吗?

(我也想过用Future做点什么,但我变得无可救药地迷茫了。我不习惯这种思维方式。)

在所有其他 Reactive 框架中,这真的很容易;您只需使用 concat 一步连接并展平结果,然后您可以 reduce 将结果放入最终数组。 Apple 使这变得困难,因为 Publisher.Concatenate 没有接受发布者数组的重载。 Publisher.Merge 也有类似的怪异之处。我有一种感觉,这与他们 return 嵌套通用发布者而不是仅仅 return 像 rx Observable 这样的单一通用类型这一事实有关。我猜你可以在循环中调用 Concatenate 然后将连接的结果减少到一个数组中,但我真的希望他们在下一个版本中解决这个问题。当然需要合并 2 个以上的发布者和合并 4 个以上的发布者(这两个运算符的重载甚至不一致,这很奇怪)。

编辑:

我回到这个问题上,发现您确实可以连接任意一组发布者,它们将按顺序发出。我不知道为什么没有像 ConcatenateMany 这样的函数来为你做这个,但看起来只要你愿意使用类型擦除的发布者,你自己写一个并不难。此示例显示 merge 按时间顺序发出,而 concat 按组合顺序发出:

import PlaygroundSupport
import SwiftUI
import Combine

let p = Just<Int>(1).append(2).append(3).delay(for: .seconds(0.25), scheduler: RunLoop.main).eraseToAnyPublisher()
let q = Just<Int>(4).append(5).append(6).eraseToAnyPublisher()
let r = Just<Int>(7).append(8).append(9).delay(for: .seconds(0.5), scheduler: RunLoop.main).eraseToAnyPublisher()
let concatenated: AnyPublisher<Int, Never> = [q,r].reduce(p) { total, next in
  total.append(next).eraseToAnyPublisher()
}

var subscriptions = Set<AnyCancellable>()

concatenated
  .sink(receiveValue: { v in
    print("concatenated: \(v)")
  }).store(in: &subscriptions)

Publishers
  .MergeMany([p,q,r])
  .sink(receiveValue: { v in
    print("merge: \(v)")
  }).store(in: &subscriptions)

这是描述可能方法的一页游乐场代码。主要思想是将异步 API 调用转换为 Future 发布者链,从而形成串行管道。

输入:从1到10的整数范围,在后台队列上异步转换成字符串

直接调用async的Demo API:

let group = DispatchGroup()
inputValues.map {
    group.enter()
    asyncCall(input: [=10=]) { (output, _) in
        print(">> \(output), in \(Thread.current)")
        group.leave()
    }
}
group.wait()

输出:

>> 1, in <NSThread: 0x7fe76264fff0>{number = 4, name = (null)}
>> 3, in <NSThread: 0x7fe762446b90>{number = 3, name = (null)}
>> 5, in <NSThread: 0x7fe7624461f0>{number = 5, name = (null)}
>> 6, in <NSThread: 0x7fe762461ce0>{number = 6, name = (null)}
>> 10, in <NSThread: 0x7fe76246a7b0>{number = 7, name = (null)}
>> 4, in <NSThread: 0x7fe764c37d30>{number = 8, name = (null)}
>> 7, in <NSThread: 0x7fe764c37cb0>{number = 9, name = (null)}
>> 8, in <NSThread: 0x7fe76246b540>{number = 10, name = (null)}
>> 9, in <NSThread: 0x7fe7625164b0>{number = 11, name = (null)}
>> 2, in <NSThread: 0x7fe764c37f50>{number = 12, name = (null)}

组合管道演示:

输出:

>> got 1
>> got 2
>> got 3
>> got 4
>> got 5
>> got 6
>> got 7
>> got 8
>> got 9
>> got 10
>>>> finished with true

代码:

import Cocoa
import Combine
import PlaygroundSupport

// Assuming there is some Asynchronous API with
// (eg. process Int input value during some time and generates String result)
func asyncCall(input: Int, completion: @escaping (String, Error?) -> Void) {
    DispatchQueue.global(qos: .background).async {
            sleep(.random(in: 1...5)) // wait for random Async API output
            completion("\(input)", nil)
        }
}

// There are some input values to be processed serially
let inputValues = Array(1...10)

// Prepare one pipeline item based on Future, which trasform Async -> Sync
func makeFuture(input: Int) -> AnyPublisher<Bool, Error> {
    Future<String, Error> { promise in
        asyncCall(input: input) { (value, error) in
            if let error = error {
                promise(.failure(error))
            } else {
                promise(.success(value))
            }
        }
    }
    .receive(on: DispatchQueue.main)
    .map {
        print(">> got \([=13=])") // << sideeffect of pipeline item
        return true
    }
    .eraseToAnyPublisher()
}

// Create pipeline trasnforming input values into chain of Future publishers
var subscribers = Set<AnyCancellable>()
let pipeline =
    inputValues
    .reduce(nil as AnyPublisher<Bool, Error>?) { (chain, value) in
        if let chain = chain {
            return chain.flatMap { _ in
                makeFuture(input: value)
            }.eraseToAnyPublisher()
        } else {
            return makeFuture(input: value)
        }
    }

// Execute pipeline
pipeline?
    .sink(receiveCompletion: { _ in
        // << do something on completion if needed
    }) { output in
        print(">>>> finished with \(output)")
    }
    .store(in: &subscribers)

PlaygroundPage.current.needsIndefiniteExecution = true

我只是简单地对此进行了测试,但在第一次通过时,似乎每个请求都在等待前一个请求完成后再开始。

我发布此解决方案是为了寻求反馈。如果这不是一个好的解决方案,请批评。

extension Collection where Element: Publisher {

    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        // If the collection is empty, we can't just create an arbititary publisher
        // so we return nil to indicate that we had nothing to serialize.
        if isEmpty { return nil }

        // We know at this point that it's safe to grab the first publisher.
        let first = self.first!

        // If there was only a single publisher then we can just return it.
        if count == 1 { return first.eraseToAnyPublisher() }

        // We're going to build up the output starting with the first publisher.
        var output = first.eraseToAnyPublisher()

        // We iterate over the rest of the publishers (skipping over the first.)
        for publisher in self.dropFirst() {
            // We build up the output by appending the next publisher.
            output = output.append(publisher).eraseToAnyPublisher()
        }

        return output
    }
}


此解决方案的更简洁版本(由@matt 提供):

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            [=11=].append().eraseToAnyPublisher()
        }
    }
}

您可以在接收返回 Subscribers.Demand.max(1) 的地方创建自定义订阅者。在那种情况下,订阅者将仅在收到一个值时请求下一个值。该示例适用于 Int.publisher,但地图中的一些随机延迟模仿网络流量:-)

import PlaygroundSupport
import SwiftUI
import Combine

class MySubscriber: Subscriber {
  typealias Input = String
  typealias Failure = Never

  func receive(subscription: Subscription) {
    print("Received subscription", Thread.current.isMainThread)
    subscription.request(.max(1))
  }

  func receive(_ input: Input) -> Subscribers.Demand {
    print("Received input: \(input)", Thread.current.isMainThread)
    return .max(1)
  }

  func receive(completion: Subscribers.Completion<Never>) {
    DispatchQueue.main.async {
        print("Received completion: \(completion)", Thread.current.isMainThread)
        PlaygroundPage.current.finishExecution()
    }
  }
}

(110...120)
    .publisher.receive(on: DispatchQueue.global())
    .map {
        print(Thread.current.isMainThread, Thread.current)
        usleep(UInt32.random(in: 10000 ... 1000000))
        return String(format: "%02x", [=10=])
    }
    .subscribe(on: DispatchQueue.main)
    .subscribe(MySubscriber())

print("Hello")

PlaygroundPage.current.needsIndefiniteExecution = true

游乐场打印...

Hello
Received subscription true
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 6e false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 6f false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 70 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 71 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 72 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 73 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 74 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 75 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 76 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 77 false
false <NSThread: 0x600000053400>{number = 3, name = (null)}
Received input: 78 false
Received completion: finished true

更新 最后我找到了 .flatMap(maxPublishers: ),这迫使我用一些不同的方法来更新这个有趣的话题。请注意,我正在使用全局队列进行调度,不仅仅是一些随机延迟,只是为了确保接收序列化流不是 "random" 或 "lucky" 行为:-)

import PlaygroundSupport
import Combine
import Foundation

PlaygroundPage.current.needsIndefiniteExecution = true

let A = (1 ... 9)
    .publisher
    .flatMap(maxPublishers: .max(1)) { value in
        [value].publisher
            .flatMap { value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: DispatchQueue.global())
        }
}
.sink { value in
    print(value, "A")
}

let B = (1 ... 9)
    .publisher
    .flatMap { value in
        [value].publisher
            .flatMap { value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: RunLoop.main)
        }
}
.sink { value in
    print("     ",value, "B")
}

打印

1 A
      4 B
      5 B
      7 B
      1 B
      2 B
      8 B
      6 B
2 A
      3 B
      9 B
3 A
4 A
5 A
6 A
7 A
8 A
9 A

基于此处所写

.serialize()?

由 Clay Ellis 定义接受的答案可以替换为

.publisher.flatMap(maxPublishers: .max(1)){$0}

而 "unserialzed" 版本必须使用

.publisher.flatMap{$0}

"real world example"

import PlaygroundSupport
import Foundation
import Combine

let path = "postman-echo.com/get"
let urls: [URL] = "... which proves the downloads are happening serially .-)".map(String.init).compactMap { (parameter) in
    var components = URLComponents()
    components.scheme = "https"
    components.path = path
    components.queryItems = [URLQueryItem(name: parameter, value: nil)]
    return components.url
}
//["https://postman-echo.com/get?]
struct Postman: Decodable {
    var args: [String: String]
}


let collection = urls.compactMap { value in
        URLSession.shared.dataTaskPublisher(for: value)
        .tryMap { data, response -> Data in
            return data
        }
        .decode(type: Postman.self, decoder: JSONDecoder())
        .catch {_ in
            Just(Postman(args: [:]))
    }
}

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            return [=14=].append().eraseToAnyPublisher()
        }
    }
}

var streamA = ""
let A = collection
    .publisher.flatMap{[=14=]}

    .sink(receiveCompletion: { (c) in
        print(streamA, "     ", c, "    .publisher.flatMap{[=14=]}")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamA)
    })


var streamC = ""
let C = collection
    .serialize()?

    .sink(receiveCompletion: { (c) in
        print(streamC, "     ", c, "    .serialize()?")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamC)
    })

var streamD = ""
let D = collection
    .publisher.flatMap(maxPublishers: .max(1)){[=14=]}

    .sink(receiveCompletion: { (c) in
        print(streamD, "     ", c, "    .publisher.flatMap(maxPublishers: .max(1)){[=14=]}")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamD)
    })

PlaygroundPage.current.needsIndefiniteExecution = true

打印

.w.h i.c hporves ht edownloadsa erh appeninsg eriall y.-)       finished     .publisher.flatMap{[=15=]}
... which proves the downloads are happening serially .-)       finished     .publisher.flatMap(maxPublishers: .max(1)){[=15=]}
... which proves the downloads are happening serially .-)       finished     .serialize()?

在我看来,在其他情况下也很有用。尝试在下一个片段中使用 maxPublishers 的默认值并比较结果:-)

import Combine

let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()

let handle = subject
    .zip(sequencePublisher.print())
    //.publish
    .flatMap(maxPublishers: .max(1), { (pair)  in
        Just(pair)
    })
    .print()
    .sink { letters, digits in
        print(letters, digits)
    }

"Hello World!".map(String.init).forEach { (s) in
    subject.send(s)
}
subject.send(completion: .finished)

来自原问题:

I did try making an array of the URLs and map it to an array of publishers. I know I can "produce" a publisher and cause it to publish on down the pipeline using flatMap. But then I'm still doing all the downloading simultaneously. There isn't any Combine way to walk the array in a controlled manner — or is there?


这里有一个玩具示例来代替实际问题:

let collection = (1 ... 10).map {
    Just([=10=]).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
collection.publisher
    .flatMap() {[=10=]}
    .sink {print([=10=])}.store(in:&self.storage)

这会在随机时间以随机顺序发出从 1 到 10 的整数。目标是用 collection 做一些事情,使其按顺序发出从 1 到 10 的整数。


现在我们只改变一件事:在行

.flatMap {[=11=]}

我们添加maxPublishers参数:

let collection = (1 ... 10).map {
    Just([=12=]).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
collection.publisher
    .flatMap(maxPublishers:.max(1)) {[=12=]}
    .sink {print([=12=])}.store(in:&self.storage)

Presto,我们现在 do 按顺序发出从 1 到 10 的整数,它们之间的间隔是随机的。


让我们把这个应用到原来的问题上。为了演示,我需要一个相当慢的 Internet 连接和一个相当大的资源来下载。首先,我会用普通的 .flatMap:

let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
    .map {URL(string:[=13=])!}
    .map {session.dataTaskPublisher(for: [=13=])
        .eraseToAnyPublisher()
}
collection.publisher.setFailureType(to: URLError.self)
    .handleEvents(receiveOutput: {_ in print("start")})
    .flatMap() {[=13=]}
    .map {[=13=].data}
    .sink(receiveCompletion: {comp in
        switch comp {
        case .failure(let err): print("error", err)
        case .finished: print("finished")
        }
    }, receiveValue: {_ in print("done")})
    .store(in:&self.storage)

结果是

start
start
start
done
done
done
finished

这表明我们正在同时进行三个下载。好的,现在改变

    .flatMap() {[=15=]}

    .flatMap(maxPublishers:.max(1) {[=16=]}

现在的结果是:

start
done
start
done
start
done
finished

所以我们现在是串口下载,本来就是要解决的问题


附加

为了保持 TIMTOWTDI 的原则,我们可以用 append 链接发布者来序列化它们:

let collection = (1 ... 10).map {
    Just([=18=]).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
let pub = collection.dropFirst().reduce(collection.first!) {
    return [=18=].append().eraseToAnyPublisher()
}

结果是序列化原始集合中延迟发布者的发布者。让我们通过订阅它来证明它:

pub.sink {print([=19=])}.store(in:&self.storage)

果然,整数现在按顺序到达(之间有随机间隔)。


我们可以根据 Clay Ellis 的建议,通过 Collection 的扩展来封装来自发布者集合的 pub 的创建:

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            return [=20=].append().eraseToAnyPublisher()
        }
    }
}

flatMap(maxPublishers:transform:).max(1) 结合使用,例如

func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError> {
    Publishers.Sequence(sequence: urls.map { self.imagePublisher(for: [=10=]) })
        .flatMap(maxPublishers: .max(1)) { [=10=] }
        .eraseToAnyPublisher()
}

在哪里

func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError> {
    URLSession.shared.dataTaskPublisher(for: url)
        .compactMap { UIImage(data: [=11=].data) }
        .receive(on: RunLoop.main)
        .eraseToAnyPublisher()
}

var imageRequests: AnyCancellable?

func fetchImages() {
    imageRequests = imagesPublisher(for: urls).sink { completion in
        switch completion {
        case .finished:
            print("done")
        case .failure(let error):
            print("failed", error)
        }
    } receiveValue: { image in
        // do whatever you want with the images as they come in
    }
}

结果是:

但我们应该认识到,像那样按顺序执行它们会严重影响性能。例如,如果我一次将它增加到 6 个,它会快两倍以上:

就个人而言,我建议仅在绝对必要时才按顺序下载(下载一系列 images/files 时,几乎可以肯定不是这种情况)。是的,并发执行请求可能会导致它们不按特定顺序完成,但我们只是使用了一种与顺序无关的结构(例如,字典而不是简单的数组),但性能提升如此显着,通常是值得的。

但是,如果您希望它们按顺序下载,maxPublishers 参数可以实现。

动态 URL 数组(如数据总线)怎么样?

      var array: [AnyPublisher<Data, URLError>] = []

      array.append(Task())

      array.publisher
         .flatMap { [=10=] }
         .sink {

         }
         // it will be finished
      array.append(Task())
      array.append(Task())
      array.append(Task())

另一种方法,如果您想收集所有下载结果,以便知道哪些下载失败,哪些没有,是编写一个如下所示的自定义发布者:

extension Publishers {
    struct Serialize<Upstream: Publisher>: Publisher {
        typealias Output = [Result<Upstream.Output, Upstream.Failure>]
        typealias Failure = Never

        let upstreams: [Upstream]

        init<C: Collection>(_ upstreams: C) where C.Element == Upstream {
            self.upstreams = Array(upstreams)
        }

        init(_ upstreams: Upstream...) {
            self.upstreams = upstreams
        }

        func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
            guard let first = upstreams.first else { return Empty().subscribe(subscriber) }
            first
                .map { Result<Upstream.Output, Upstream.Failure>.success([=10=]) }
                .catch { Just(Result<Upstream.Output, Upstream.Failure>.failure([=10=])) }
                .map { [[=10=]] }
                .append(Serialize(upstreams.dropFirst()))
                .collect()
                .map { [=10=].flatMap { [=10=] } }
                .subscribe(subscriber)
        }
    }
}

extension Collection where Element: Publisher {  
    func serializedPublishers() -> Publishers.Serialize<Element> {
        .init(self)
    }
}

发布者接受第一个下载任务,将其 output/failure 转换为 Result 实例,并将其添加到列表其余部分的“递归”调用中。

用法:Publishers.Serialize(listOfDownloadTasks),或listOfDownloadTasks.serializedPublishers()

此实现的一个小不便之处是 Result 实例需要包装到一个数组中,只是为了在管道后面的三个步骤中展平。也许有人可以提出更好的替代方案。