组合框架序列化异步操作
Combine framework serialize async operations
如何让构成 Combine 框架的异步管道同步(串行)排列?
假设我有 50 个 URL,我想从中下载相应的资源,假设我想一次下载一个。我知道如何使用 Operation / OperationQueue 做到这一点,例如使用在下载完成之前不声明自己完成的 Operation 子类。我如何使用 Combine 做同样的事情?
目前我所想的就是保留一份剩余 URL 的全局列表并弹出一个,为一次下载设置一个管道,进行下载,然后在 sink
管道,重复。这看起来不太像Combine。
我确实尝试制作一组 URL 并将其映射到一组发布者。我知道我可以 "produce" 一个发布者并使用 flatMap
使其在管道中发布。但后来我仍然在同时进行所有下载。没有任何 Combine 方法可以以受控方式遍历阵列 — 或者有吗?
(我也想过用Future做点什么,但我变得无可救药地迷茫了。我不习惯这种思维方式。)
在所有其他 Reactive 框架中,这真的很容易;您只需使用 concat
一步连接并展平结果,然后您可以 reduce
将结果放入最终数组。 Apple 使这变得困难,因为 Publisher.Concatenate
没有接受发布者数组的重载。 Publisher.Merge
也有类似的怪异之处。我有一种感觉,这与他们 return 嵌套通用发布者而不是仅仅 return 像 rx Observable 这样的单一通用类型这一事实有关。我猜你可以在循环中调用 Concatenate 然后将连接的结果减少到一个数组中,但我真的希望他们在下一个版本中解决这个问题。当然需要合并 2 个以上的发布者和合并 4 个以上的发布者(这两个运算符的重载甚至不一致,这很奇怪)。
编辑:
我回到这个问题上,发现您确实可以连接任意一组发布者,它们将按顺序发出。我不知道为什么没有像 ConcatenateMany
这样的函数来为你做这个,但看起来只要你愿意使用类型擦除的发布者,你自己写一个并不难。此示例显示 merge 按时间顺序发出,而 concat 按组合顺序发出:
import PlaygroundSupport
import SwiftUI
import Combine
let p = Just<Int>(1).append(2).append(3).delay(for: .seconds(0.25), scheduler: RunLoop.main).eraseToAnyPublisher()
let q = Just<Int>(4).append(5).append(6).eraseToAnyPublisher()
let r = Just<Int>(7).append(8).append(9).delay(for: .seconds(0.5), scheduler: RunLoop.main).eraseToAnyPublisher()
let concatenated: AnyPublisher<Int, Never> = [q,r].reduce(p) { total, next in
total.append(next).eraseToAnyPublisher()
}
var subscriptions = Set<AnyCancellable>()
concatenated
.sink(receiveValue: { v in
print("concatenated: \(v)")
}).store(in: &subscriptions)
Publishers
.MergeMany([p,q,r])
.sink(receiveValue: { v in
print("merge: \(v)")
}).store(in: &subscriptions)
这是描述可能方法的一页游乐场代码。主要思想是将异步 API 调用转换为 Future
发布者链,从而形成串行管道。
输入:从1到10的整数范围,在后台队列上异步转换成字符串
直接调用async的Demo API:
let group = DispatchGroup()
inputValues.map {
group.enter()
asyncCall(input: [=10=]) { (output, _) in
print(">> \(output), in \(Thread.current)")
group.leave()
}
}
group.wait()
输出:
>> 1, in <NSThread: 0x7fe76264fff0>{number = 4, name = (null)}
>> 3, in <NSThread: 0x7fe762446b90>{number = 3, name = (null)}
>> 5, in <NSThread: 0x7fe7624461f0>{number = 5, name = (null)}
>> 6, in <NSThread: 0x7fe762461ce0>{number = 6, name = (null)}
>> 10, in <NSThread: 0x7fe76246a7b0>{number = 7, name = (null)}
>> 4, in <NSThread: 0x7fe764c37d30>{number = 8, name = (null)}
>> 7, in <NSThread: 0x7fe764c37cb0>{number = 9, name = (null)}
>> 8, in <NSThread: 0x7fe76246b540>{number = 10, name = (null)}
>> 9, in <NSThread: 0x7fe7625164b0>{number = 11, name = (null)}
>> 2, in <NSThread: 0x7fe764c37f50>{number = 12, name = (null)}
组合管道演示:
输出:
>> got 1
>> got 2
>> got 3
>> got 4
>> got 5
>> got 6
>> got 7
>> got 8
>> got 9
>> got 10
>>>> finished with true
代码:
import Cocoa
import Combine
import PlaygroundSupport
// Assuming there is some Asynchronous API with
// (eg. process Int input value during some time and generates String result)
func asyncCall(input: Int, completion: @escaping (String, Error?) -> Void) {
DispatchQueue.global(qos: .background).async {
sleep(.random(in: 1...5)) // wait for random Async API output
completion("\(input)", nil)
}
}
// There are some input values to be processed serially
let inputValues = Array(1...10)
// Prepare one pipeline item based on Future, which trasform Async -> Sync
func makeFuture(input: Int) -> AnyPublisher<Bool, Error> {
Future<String, Error> { promise in
asyncCall(input: input) { (value, error) in
if let error = error {
promise(.failure(error))
} else {
promise(.success(value))
}
}
}
.receive(on: DispatchQueue.main)
.map {
print(">> got \([=13=])") // << sideeffect of pipeline item
return true
}
.eraseToAnyPublisher()
}
// Create pipeline trasnforming input values into chain of Future publishers
var subscribers = Set<AnyCancellable>()
let pipeline =
inputValues
.reduce(nil as AnyPublisher<Bool, Error>?) { (chain, value) in
if let chain = chain {
return chain.flatMap { _ in
makeFuture(input: value)
}.eraseToAnyPublisher()
} else {
return makeFuture(input: value)
}
}
// Execute pipeline
pipeline?
.sink(receiveCompletion: { _ in
// << do something on completion if needed
}) { output in
print(">>>> finished with \(output)")
}
.store(in: &subscribers)
PlaygroundPage.current.needsIndefiniteExecution = true
我只是简单地对此进行了测试,但在第一次通过时,似乎每个请求都在等待前一个请求完成后再开始。
我发布此解决方案是为了寻求反馈。如果这不是一个好的解决方案,请批评。
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
// If the collection is empty, we can't just create an arbititary publisher
// so we return nil to indicate that we had nothing to serialize.
if isEmpty { return nil }
// We know at this point that it's safe to grab the first publisher.
let first = self.first!
// If there was only a single publisher then we can just return it.
if count == 1 { return first.eraseToAnyPublisher() }
// We're going to build up the output starting with the first publisher.
var output = first.eraseToAnyPublisher()
// We iterate over the rest of the publishers (skipping over the first.)
for publisher in self.dropFirst() {
// We build up the output by appending the next publisher.
output = output.append(publisher).eraseToAnyPublisher()
}
return output
}
}
此解决方案的更简洁版本(由@matt 提供):
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
[=11=].append().eraseToAnyPublisher()
}
}
}
您可以在接收返回 Subscribers.Demand.max(1) 的地方创建自定义订阅者。在那种情况下,订阅者将仅在收到一个值时请求下一个值。该示例适用于 Int.publisher,但地图中的一些随机延迟模仿网络流量:-)
import PlaygroundSupport
import SwiftUI
import Combine
class MySubscriber: Subscriber {
typealias Input = String
typealias Failure = Never
func receive(subscription: Subscription) {
print("Received subscription", Thread.current.isMainThread)
subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
print("Received input: \(input)", Thread.current.isMainThread)
return .max(1)
}
func receive(completion: Subscribers.Completion<Never>) {
DispatchQueue.main.async {
print("Received completion: \(completion)", Thread.current.isMainThread)
PlaygroundPage.current.finishExecution()
}
}
}
(110...120)
.publisher.receive(on: DispatchQueue.global())
.map {
print(Thread.current.isMainThread, Thread.current)
usleep(UInt32.random(in: 10000 ... 1000000))
return String(format: "%02x", [=10=])
}
.subscribe(on: DispatchQueue.main)
.subscribe(MySubscriber())
print("Hello")
PlaygroundPage.current.needsIndefiniteExecution = true
游乐场打印...
Hello
Received subscription true
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 6e false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 6f false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 70 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 71 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 72 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 73 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 74 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 75 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 76 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 77 false
false <NSThread: 0x600000053400>{number = 3, name = (null)}
Received input: 78 false
Received completion: finished true
更新
最后我找到了 .flatMap(maxPublishers: )
,这迫使我用一些不同的方法来更新这个有趣的话题。请注意,我正在使用全局队列进行调度,不仅仅是一些随机延迟,只是为了确保接收序列化流不是 "random" 或 "lucky" 行为:-)
import PlaygroundSupport
import Combine
import Foundation
PlaygroundPage.current.needsIndefiniteExecution = true
let A = (1 ... 9)
.publisher
.flatMap(maxPublishers: .max(1)) { value in
[value].publisher
.flatMap { value in
Just(value)
.delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: DispatchQueue.global())
}
}
.sink { value in
print(value, "A")
}
let B = (1 ... 9)
.publisher
.flatMap { value in
[value].publisher
.flatMap { value in
Just(value)
.delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: RunLoop.main)
}
}
.sink { value in
print(" ",value, "B")
}
打印
1 A
4 B
5 B
7 B
1 B
2 B
8 B
6 B
2 A
3 B
9 B
3 A
4 A
5 A
6 A
7 A
8 A
9 A
基于此处所写
.serialize()?
由 Clay Ellis 定义接受的答案可以替换为
.publisher.flatMap(maxPublishers: .max(1)){$0}
而 "unserialzed" 版本必须使用
.publisher.flatMap{$0}
"real world example"
import PlaygroundSupport
import Foundation
import Combine
let path = "postman-echo.com/get"
let urls: [URL] = "... which proves the downloads are happening serially .-)".map(String.init).compactMap { (parameter) in
var components = URLComponents()
components.scheme = "https"
components.path = path
components.queryItems = [URLQueryItem(name: parameter, value: nil)]
return components.url
}
//["https://postman-echo.com/get?]
struct Postman: Decodable {
var args: [String: String]
}
let collection = urls.compactMap { value in
URLSession.shared.dataTaskPublisher(for: value)
.tryMap { data, response -> Data in
return data
}
.decode(type: Postman.self, decoder: JSONDecoder())
.catch {_ in
Just(Postman(args: [:]))
}
}
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
return [=14=].append().eraseToAnyPublisher()
}
}
}
var streamA = ""
let A = collection
.publisher.flatMap{[=14=]}
.sink(receiveCompletion: { (c) in
print(streamA, " ", c, " .publisher.flatMap{[=14=]}")
}, receiveValue: { (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamA)
})
var streamC = ""
let C = collection
.serialize()?
.sink(receiveCompletion: { (c) in
print(streamC, " ", c, " .serialize()?")
}, receiveValue: { (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamC)
})
var streamD = ""
let D = collection
.publisher.flatMap(maxPublishers: .max(1)){[=14=]}
.sink(receiveCompletion: { (c) in
print(streamD, " ", c, " .publisher.flatMap(maxPublishers: .max(1)){[=14=]}")
}, receiveValue: { (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamD)
})
PlaygroundPage.current.needsIndefiniteExecution = true
打印
.w.h i.c hporves ht edownloadsa erh appeninsg eriall y.-) finished .publisher.flatMap{[=15=]}
... which proves the downloads are happening serially .-) finished .publisher.flatMap(maxPublishers: .max(1)){[=15=]}
... which proves the downloads are happening serially .-) finished .serialize()?
在我看来,在其他情况下也很有用。尝试在下一个片段中使用 maxPublishers 的默认值并比较结果:-)
import Combine
let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()
let handle = subject
.zip(sequencePublisher.print())
//.publish
.flatMap(maxPublishers: .max(1), { (pair) in
Just(pair)
})
.print()
.sink { letters, digits in
print(letters, digits)
}
"Hello World!".map(String.init).forEach { (s) in
subject.send(s)
}
subject.send(completion: .finished)
来自原问题:
I did try making an array of the URLs and map it to an array of publishers. I know I can "produce" a publisher and cause it to publish on down the pipeline using flatMap
. But then I'm still doing all the downloading simultaneously. There isn't any Combine way to walk the array in a controlled manner — or is there?
这里有一个玩具示例来代替实际问题:
let collection = (1 ... 10).map {
Just([=10=]).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
collection.publisher
.flatMap() {[=10=]}
.sink {print([=10=])}.store(in:&self.storage)
这会在随机时间以随机顺序发出从 1 到 10 的整数。目标是用 collection
做一些事情,使其按顺序发出从 1 到 10 的整数。
现在我们只改变一件事:在行
.flatMap {[=11=]}
我们添加maxPublishers
参数:
let collection = (1 ... 10).map {
Just([=12=]).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
collection.publisher
.flatMap(maxPublishers:.max(1)) {[=12=]}
.sink {print([=12=])}.store(in:&self.storage)
Presto,我们现在 do 按顺序发出从 1 到 10 的整数,它们之间的间隔是随机的。
让我们把这个应用到原来的问题上。为了演示,我需要一个相当慢的 Internet 连接和一个相当大的资源来下载。首先,我会用普通的 .flatMap
:
let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
.map {URL(string:[=13=])!}
.map {session.dataTaskPublisher(for: [=13=])
.eraseToAnyPublisher()
}
collection.publisher.setFailureType(to: URLError.self)
.handleEvents(receiveOutput: {_ in print("start")})
.flatMap() {[=13=]}
.map {[=13=].data}
.sink(receiveCompletion: {comp in
switch comp {
case .failure(let err): print("error", err)
case .finished: print("finished")
}
}, receiveValue: {_ in print("done")})
.store(in:&self.storage)
结果是
start
start
start
done
done
done
finished
这表明我们正在同时进行三个下载。好的,现在改变
.flatMap() {[=15=]}
到
.flatMap(maxPublishers:.max(1) {[=16=]}
现在的结果是:
start
done
start
done
start
done
finished
所以我们现在是串口下载,本来就是要解决的问题
附加
为了保持 TIMTOWTDI 的原则,我们可以用 append
链接发布者来序列化它们:
let collection = (1 ... 10).map {
Just([=18=]).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
let pub = collection.dropFirst().reduce(collection.first!) {
return [=18=].append().eraseToAnyPublisher()
}
结果是序列化原始集合中延迟发布者的发布者。让我们通过订阅它来证明它:
pub.sink {print([=19=])}.store(in:&self.storage)
果然,整数现在按顺序到达(之间有随机间隔)。
我们可以根据 Clay Ellis 的建议,通过 Collection 的扩展来封装来自发布者集合的 pub
的创建:
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
return [=20=].append().eraseToAnyPublisher()
}
}
}
将 flatMap(maxPublishers:transform:)
与 .max(1)
结合使用,例如
func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError> {
Publishers.Sequence(sequence: urls.map { self.imagePublisher(for: [=10=]) })
.flatMap(maxPublishers: .max(1)) { [=10=] }
.eraseToAnyPublisher()
}
在哪里
func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError> {
URLSession.shared.dataTaskPublisher(for: url)
.compactMap { UIImage(data: [=11=].data) }
.receive(on: RunLoop.main)
.eraseToAnyPublisher()
}
和
var imageRequests: AnyCancellable?
func fetchImages() {
imageRequests = imagesPublisher(for: urls).sink { completion in
switch completion {
case .finished:
print("done")
case .failure(let error):
print("failed", error)
}
} receiveValue: { image in
// do whatever you want with the images as they come in
}
}
结果是:
但我们应该认识到,像那样按顺序执行它们会严重影响性能。例如,如果我一次将它增加到 6 个,它会快两倍以上:
就个人而言,我建议仅在绝对必要时才按顺序下载(下载一系列 images/files 时,几乎可以肯定不是这种情况)。是的,并发执行请求可能会导致它们不按特定顺序完成,但我们只是使用了一种与顺序无关的结构(例如,字典而不是简单的数组),但性能提升如此显着,通常是值得的。
但是,如果您希望它们按顺序下载,maxPublishers
参数可以实现。
动态 URL 数组(如数据总线)怎么样?
var array: [AnyPublisher<Data, URLError>] = []
array.append(Task())
array.publisher
.flatMap { [=10=] }
.sink {
}
// it will be finished
array.append(Task())
array.append(Task())
array.append(Task())
另一种方法,如果您想收集所有下载结果,以便知道哪些下载失败,哪些没有,是编写一个如下所示的自定义发布者:
extension Publishers {
struct Serialize<Upstream: Publisher>: Publisher {
typealias Output = [Result<Upstream.Output, Upstream.Failure>]
typealias Failure = Never
let upstreams: [Upstream]
init<C: Collection>(_ upstreams: C) where C.Element == Upstream {
self.upstreams = Array(upstreams)
}
init(_ upstreams: Upstream...) {
self.upstreams = upstreams
}
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
guard let first = upstreams.first else { return Empty().subscribe(subscriber) }
first
.map { Result<Upstream.Output, Upstream.Failure>.success([=10=]) }
.catch { Just(Result<Upstream.Output, Upstream.Failure>.failure([=10=])) }
.map { [[=10=]] }
.append(Serialize(upstreams.dropFirst()))
.collect()
.map { [=10=].flatMap { [=10=] } }
.subscribe(subscriber)
}
}
}
extension Collection where Element: Publisher {
func serializedPublishers() -> Publishers.Serialize<Element> {
.init(self)
}
}
发布者接受第一个下载任务,将其 output/failure 转换为 Result
实例,并将其添加到列表其余部分的“递归”调用中。
用法:Publishers.Serialize(listOfDownloadTasks)
,或listOfDownloadTasks.serializedPublishers()
。
此实现的一个小不便之处是 Result
实例需要包装到一个数组中,只是为了在管道后面的三个步骤中展平。也许有人可以提出更好的替代方案。
如何让构成 Combine 框架的异步管道同步(串行)排列?
假设我有 50 个 URL,我想从中下载相应的资源,假设我想一次下载一个。我知道如何使用 Operation / OperationQueue 做到这一点,例如使用在下载完成之前不声明自己完成的 Operation 子类。我如何使用 Combine 做同样的事情?
目前我所想的就是保留一份剩余 URL 的全局列表并弹出一个,为一次下载设置一个管道,进行下载,然后在 sink
管道,重复。这看起来不太像Combine。
我确实尝试制作一组 URL 并将其映射到一组发布者。我知道我可以 "produce" 一个发布者并使用 flatMap
使其在管道中发布。但后来我仍然在同时进行所有下载。没有任何 Combine 方法可以以受控方式遍历阵列 — 或者有吗?
(我也想过用Future做点什么,但我变得无可救药地迷茫了。我不习惯这种思维方式。)
在所有其他 Reactive 框架中,这真的很容易;您只需使用 concat
一步连接并展平结果,然后您可以 reduce
将结果放入最终数组。 Apple 使这变得困难,因为 Publisher.Concatenate
没有接受发布者数组的重载。 Publisher.Merge
也有类似的怪异之处。我有一种感觉,这与他们 return 嵌套通用发布者而不是仅仅 return 像 rx Observable 这样的单一通用类型这一事实有关。我猜你可以在循环中调用 Concatenate 然后将连接的结果减少到一个数组中,但我真的希望他们在下一个版本中解决这个问题。当然需要合并 2 个以上的发布者和合并 4 个以上的发布者(这两个运算符的重载甚至不一致,这很奇怪)。
编辑:
我回到这个问题上,发现您确实可以连接任意一组发布者,它们将按顺序发出。我不知道为什么没有像 ConcatenateMany
这样的函数来为你做这个,但看起来只要你愿意使用类型擦除的发布者,你自己写一个并不难。此示例显示 merge 按时间顺序发出,而 concat 按组合顺序发出:
import PlaygroundSupport
import SwiftUI
import Combine
let p = Just<Int>(1).append(2).append(3).delay(for: .seconds(0.25), scheduler: RunLoop.main).eraseToAnyPublisher()
let q = Just<Int>(4).append(5).append(6).eraseToAnyPublisher()
let r = Just<Int>(7).append(8).append(9).delay(for: .seconds(0.5), scheduler: RunLoop.main).eraseToAnyPublisher()
let concatenated: AnyPublisher<Int, Never> = [q,r].reduce(p) { total, next in
total.append(next).eraseToAnyPublisher()
}
var subscriptions = Set<AnyCancellable>()
concatenated
.sink(receiveValue: { v in
print("concatenated: \(v)")
}).store(in: &subscriptions)
Publishers
.MergeMany([p,q,r])
.sink(receiveValue: { v in
print("merge: \(v)")
}).store(in: &subscriptions)
这是描述可能方法的一页游乐场代码。主要思想是将异步 API 调用转换为 Future
发布者链,从而形成串行管道。
输入:从1到10的整数范围,在后台队列上异步转换成字符串
直接调用async的Demo API:
let group = DispatchGroup()
inputValues.map {
group.enter()
asyncCall(input: [=10=]) { (output, _) in
print(">> \(output), in \(Thread.current)")
group.leave()
}
}
group.wait()
输出:
>> 1, in <NSThread: 0x7fe76264fff0>{number = 4, name = (null)} >> 3, in <NSThread: 0x7fe762446b90>{number = 3, name = (null)} >> 5, in <NSThread: 0x7fe7624461f0>{number = 5, name = (null)} >> 6, in <NSThread: 0x7fe762461ce0>{number = 6, name = (null)} >> 10, in <NSThread: 0x7fe76246a7b0>{number = 7, name = (null)} >> 4, in <NSThread: 0x7fe764c37d30>{number = 8, name = (null)} >> 7, in <NSThread: 0x7fe764c37cb0>{number = 9, name = (null)} >> 8, in <NSThread: 0x7fe76246b540>{number = 10, name = (null)} >> 9, in <NSThread: 0x7fe7625164b0>{number = 11, name = (null)} >> 2, in <NSThread: 0x7fe764c37f50>{number = 12, name = (null)}
组合管道演示:
输出:
>> got 1 >> got 2 >> got 3 >> got 4 >> got 5 >> got 6 >> got 7 >> got 8 >> got 9 >> got 10 >>>> finished with true
代码:
import Cocoa
import Combine
import PlaygroundSupport
// Assuming there is some Asynchronous API with
// (eg. process Int input value during some time and generates String result)
func asyncCall(input: Int, completion: @escaping (String, Error?) -> Void) {
DispatchQueue.global(qos: .background).async {
sleep(.random(in: 1...5)) // wait for random Async API output
completion("\(input)", nil)
}
}
// There are some input values to be processed serially
let inputValues = Array(1...10)
// Prepare one pipeline item based on Future, which trasform Async -> Sync
func makeFuture(input: Int) -> AnyPublisher<Bool, Error> {
Future<String, Error> { promise in
asyncCall(input: input) { (value, error) in
if let error = error {
promise(.failure(error))
} else {
promise(.success(value))
}
}
}
.receive(on: DispatchQueue.main)
.map {
print(">> got \([=13=])") // << sideeffect of pipeline item
return true
}
.eraseToAnyPublisher()
}
// Create pipeline trasnforming input values into chain of Future publishers
var subscribers = Set<AnyCancellable>()
let pipeline =
inputValues
.reduce(nil as AnyPublisher<Bool, Error>?) { (chain, value) in
if let chain = chain {
return chain.flatMap { _ in
makeFuture(input: value)
}.eraseToAnyPublisher()
} else {
return makeFuture(input: value)
}
}
// Execute pipeline
pipeline?
.sink(receiveCompletion: { _ in
// << do something on completion if needed
}) { output in
print(">>>> finished with \(output)")
}
.store(in: &subscribers)
PlaygroundPage.current.needsIndefiniteExecution = true
我只是简单地对此进行了测试,但在第一次通过时,似乎每个请求都在等待前一个请求完成后再开始。
我发布此解决方案是为了寻求反馈。如果这不是一个好的解决方案,请批评。
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
// If the collection is empty, we can't just create an arbititary publisher
// so we return nil to indicate that we had nothing to serialize.
if isEmpty { return nil }
// We know at this point that it's safe to grab the first publisher.
let first = self.first!
// If there was only a single publisher then we can just return it.
if count == 1 { return first.eraseToAnyPublisher() }
// We're going to build up the output starting with the first publisher.
var output = first.eraseToAnyPublisher()
// We iterate over the rest of the publishers (skipping over the first.)
for publisher in self.dropFirst() {
// We build up the output by appending the next publisher.
output = output.append(publisher).eraseToAnyPublisher()
}
return output
}
}
此解决方案的更简洁版本(由@matt 提供):
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
[=11=].append().eraseToAnyPublisher()
}
}
}
您可以在接收返回 Subscribers.Demand.max(1) 的地方创建自定义订阅者。在那种情况下,订阅者将仅在收到一个值时请求下一个值。该示例适用于 Int.publisher,但地图中的一些随机延迟模仿网络流量:-)
import PlaygroundSupport
import SwiftUI
import Combine
class MySubscriber: Subscriber {
typealias Input = String
typealias Failure = Never
func receive(subscription: Subscription) {
print("Received subscription", Thread.current.isMainThread)
subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
print("Received input: \(input)", Thread.current.isMainThread)
return .max(1)
}
func receive(completion: Subscribers.Completion<Never>) {
DispatchQueue.main.async {
print("Received completion: \(completion)", Thread.current.isMainThread)
PlaygroundPage.current.finishExecution()
}
}
}
(110...120)
.publisher.receive(on: DispatchQueue.global())
.map {
print(Thread.current.isMainThread, Thread.current)
usleep(UInt32.random(in: 10000 ... 1000000))
return String(format: "%02x", [=10=])
}
.subscribe(on: DispatchQueue.main)
.subscribe(MySubscriber())
print("Hello")
PlaygroundPage.current.needsIndefiniteExecution = true
游乐场打印...
Hello
Received subscription true
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 6e false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 6f false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 70 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 71 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 72 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 73 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 74 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 75 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 76 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 77 false
false <NSThread: 0x600000053400>{number = 3, name = (null)}
Received input: 78 false
Received completion: finished true
更新
最后我找到了 .flatMap(maxPublishers: )
,这迫使我用一些不同的方法来更新这个有趣的话题。请注意,我正在使用全局队列进行调度,不仅仅是一些随机延迟,只是为了确保接收序列化流不是 "random" 或 "lucky" 行为:-)
import PlaygroundSupport
import Combine
import Foundation
PlaygroundPage.current.needsIndefiniteExecution = true
let A = (1 ... 9)
.publisher
.flatMap(maxPublishers: .max(1)) { value in
[value].publisher
.flatMap { value in
Just(value)
.delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: DispatchQueue.global())
}
}
.sink { value in
print(value, "A")
}
let B = (1 ... 9)
.publisher
.flatMap { value in
[value].publisher
.flatMap { value in
Just(value)
.delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: RunLoop.main)
}
}
.sink { value in
print(" ",value, "B")
}
打印
1 A
4 B
5 B
7 B
1 B
2 B
8 B
6 B
2 A
3 B
9 B
3 A
4 A
5 A
6 A
7 A
8 A
9 A
基于此处所写
.serialize()?
由 Clay Ellis 定义接受的答案可以替换为
.publisher.flatMap(maxPublishers: .max(1)){$0}
而 "unserialzed" 版本必须使用
.publisher.flatMap{$0}
"real world example"
import PlaygroundSupport
import Foundation
import Combine
let path = "postman-echo.com/get"
let urls: [URL] = "... which proves the downloads are happening serially .-)".map(String.init).compactMap { (parameter) in
var components = URLComponents()
components.scheme = "https"
components.path = path
components.queryItems = [URLQueryItem(name: parameter, value: nil)]
return components.url
}
//["https://postman-echo.com/get?]
struct Postman: Decodable {
var args: [String: String]
}
let collection = urls.compactMap { value in
URLSession.shared.dataTaskPublisher(for: value)
.tryMap { data, response -> Data in
return data
}
.decode(type: Postman.self, decoder: JSONDecoder())
.catch {_ in
Just(Postman(args: [:]))
}
}
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
return [=14=].append().eraseToAnyPublisher()
}
}
}
var streamA = ""
let A = collection
.publisher.flatMap{[=14=]}
.sink(receiveCompletion: { (c) in
print(streamA, " ", c, " .publisher.flatMap{[=14=]}")
}, receiveValue: { (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamA)
})
var streamC = ""
let C = collection
.serialize()?
.sink(receiveCompletion: { (c) in
print(streamC, " ", c, " .serialize()?")
}, receiveValue: { (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamC)
})
var streamD = ""
let D = collection
.publisher.flatMap(maxPublishers: .max(1)){[=14=]}
.sink(receiveCompletion: { (c) in
print(streamD, " ", c, " .publisher.flatMap(maxPublishers: .max(1)){[=14=]}")
}, receiveValue: { (postman) in
print(postman.args.keys.joined(), terminator: "", to: &streamD)
})
PlaygroundPage.current.needsIndefiniteExecution = true
打印
.w.h i.c hporves ht edownloadsa erh appeninsg eriall y.-) finished .publisher.flatMap{[=15=]}
... which proves the downloads are happening serially .-) finished .publisher.flatMap(maxPublishers: .max(1)){[=15=]}
... which proves the downloads are happening serially .-) finished .serialize()?
在我看来,在其他情况下也很有用。尝试在下一个片段中使用 maxPublishers 的默认值并比较结果:-)
import Combine
let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()
let handle = subject
.zip(sequencePublisher.print())
//.publish
.flatMap(maxPublishers: .max(1), { (pair) in
Just(pair)
})
.print()
.sink { letters, digits in
print(letters, digits)
}
"Hello World!".map(String.init).forEach { (s) in
subject.send(s)
}
subject.send(completion: .finished)
来自原问题:
I did try making an array of the URLs and map it to an array of publishers. I know I can "produce" a publisher and cause it to publish on down the pipeline using
flatMap
. But then I'm still doing all the downloading simultaneously. There isn't any Combine way to walk the array in a controlled manner — or is there?
这里有一个玩具示例来代替实际问题:
let collection = (1 ... 10).map {
Just([=10=]).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
collection.publisher
.flatMap() {[=10=]}
.sink {print([=10=])}.store(in:&self.storage)
这会在随机时间以随机顺序发出从 1 到 10 的整数。目标是用 collection
做一些事情,使其按顺序发出从 1 到 10 的整数。
现在我们只改变一件事:在行
.flatMap {[=11=]}
我们添加maxPublishers
参数:
let collection = (1 ... 10).map {
Just([=12=]).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
collection.publisher
.flatMap(maxPublishers:.max(1)) {[=12=]}
.sink {print([=12=])}.store(in:&self.storage)
Presto,我们现在 do 按顺序发出从 1 到 10 的整数,它们之间的间隔是随机的。
让我们把这个应用到原来的问题上。为了演示,我需要一个相当慢的 Internet 连接和一个相当大的资源来下载。首先,我会用普通的 .flatMap
:
let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
.map {URL(string:[=13=])!}
.map {session.dataTaskPublisher(for: [=13=])
.eraseToAnyPublisher()
}
collection.publisher.setFailureType(to: URLError.self)
.handleEvents(receiveOutput: {_ in print("start")})
.flatMap() {[=13=]}
.map {[=13=].data}
.sink(receiveCompletion: {comp in
switch comp {
case .failure(let err): print("error", err)
case .finished: print("finished")
}
}, receiveValue: {_ in print("done")})
.store(in:&self.storage)
结果是
start
start
start
done
done
done
finished
这表明我们正在同时进行三个下载。好的,现在改变
.flatMap() {[=15=]}
到
.flatMap(maxPublishers:.max(1) {[=16=]}
现在的结果是:
start
done
start
done
start
done
finished
所以我们现在是串口下载,本来就是要解决的问题
附加
为了保持 TIMTOWTDI 的原则,我们可以用 append
链接发布者来序列化它们:
let collection = (1 ... 10).map {
Just([=18=]).delay(
for: .seconds(Double.random(in:1...5)),
scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
let pub = collection.dropFirst().reduce(collection.first!) {
return [=18=].append().eraseToAnyPublisher()
}
结果是序列化原始集合中延迟发布者的发布者。让我们通过订阅它来证明它:
pub.sink {print([=19=])}.store(in:&self.storage)
果然,整数现在按顺序到达(之间有随机间隔)。
我们可以根据 Clay Ellis 的建议,通过 Collection 的扩展来封装来自发布者集合的 pub
的创建:
extension Collection where Element: Publisher {
func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
guard let start = self.first else { return nil }
return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
return [=20=].append().eraseToAnyPublisher()
}
}
}
将 flatMap(maxPublishers:transform:)
与 .max(1)
结合使用,例如
func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError> {
Publishers.Sequence(sequence: urls.map { self.imagePublisher(for: [=10=]) })
.flatMap(maxPublishers: .max(1)) { [=10=] }
.eraseToAnyPublisher()
}
在哪里
func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError> {
URLSession.shared.dataTaskPublisher(for: url)
.compactMap { UIImage(data: [=11=].data) }
.receive(on: RunLoop.main)
.eraseToAnyPublisher()
}
和
var imageRequests: AnyCancellable?
func fetchImages() {
imageRequests = imagesPublisher(for: urls).sink { completion in
switch completion {
case .finished:
print("done")
case .failure(let error):
print("failed", error)
}
} receiveValue: { image in
// do whatever you want with the images as they come in
}
}
结果是:
但我们应该认识到,像那样按顺序执行它们会严重影响性能。例如,如果我一次将它增加到 6 个,它会快两倍以上:
就个人而言,我建议仅在绝对必要时才按顺序下载(下载一系列 images/files 时,几乎可以肯定不是这种情况)。是的,并发执行请求可能会导致它们不按特定顺序完成,但我们只是使用了一种与顺序无关的结构(例如,字典而不是简单的数组),但性能提升如此显着,通常是值得的。
但是,如果您希望它们按顺序下载,maxPublishers
参数可以实现。
动态 URL 数组(如数据总线)怎么样?
var array: [AnyPublisher<Data, URLError>] = []
array.append(Task())
array.publisher
.flatMap { [=10=] }
.sink {
}
// it will be finished
array.append(Task())
array.append(Task())
array.append(Task())
另一种方法,如果您想收集所有下载结果,以便知道哪些下载失败,哪些没有,是编写一个如下所示的自定义发布者:
extension Publishers {
struct Serialize<Upstream: Publisher>: Publisher {
typealias Output = [Result<Upstream.Output, Upstream.Failure>]
typealias Failure = Never
let upstreams: [Upstream]
init<C: Collection>(_ upstreams: C) where C.Element == Upstream {
self.upstreams = Array(upstreams)
}
init(_ upstreams: Upstream...) {
self.upstreams = upstreams
}
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
guard let first = upstreams.first else { return Empty().subscribe(subscriber) }
first
.map { Result<Upstream.Output, Upstream.Failure>.success([=10=]) }
.catch { Just(Result<Upstream.Output, Upstream.Failure>.failure([=10=])) }
.map { [[=10=]] }
.append(Serialize(upstreams.dropFirst()))
.collect()
.map { [=10=].flatMap { [=10=] } }
.subscribe(subscriber)
}
}
}
extension Collection where Element: Publisher {
func serializedPublishers() -> Publishers.Serialize<Element> {
.init(self)
}
}
发布者接受第一个下载任务,将其 output/failure 转换为 Result
实例,并将其添加到列表其余部分的“递归”调用中。
用法:Publishers.Serialize(listOfDownloadTasks)
,或listOfDownloadTasks.serializedPublishers()
。
此实现的一个小不便之处是 Result
实例需要包装到一个数组中,只是为了在管道后面的三个步骤中展平。也许有人可以提出更好的替代方案。