Apple Combine 框架:如何并行执行多个 Publisher 并等待它们全部完成?
Apple Combine framework: How to execute multiple Publishers in parallel and wait for all of them to finish?
我正在探索 Combine。我编写了以 "combine" 方式发出 HTTP 请求的方法,例如:
func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
var request = URLRequest(url: url,
cachePolicy: .useProtocolCachePolicy,
timeoutInterval: 15)
request.httpMethod = "GET"
return urlSession.dataTaskPublisher(for: request)
.tryMap {
return [=10=].data
}
.eraseToAnyPublisher()
}
我想多次调用该方法,最后完成一个任务,例如:
let myURLs: [URL] = ...
for url in myURLs {
let cancellable = testRawDataTaskPublisher(for: url)
.sink(receiveCompletion: { _ in }) { data in
// save the data...
}
}
上面的代码不起作用,因为我必须将可取消项存储在属于 class 的变量中。
第一个问题是:将许多(例如 1000 个)可取消项存储在 Set<AnyCancellable>
之类的东西中是个好主意吗???不会造成内存泄漏吗?
var cancellables = Set<AnyCancellable>()
...
let cancellable = ...
cancellables.insert(cancellable) // ???
而第二个问题是:当所有的cancellable都完成后,如何开始一个task?我正在考虑类似的事情
class Test {
var cancellables = Set<AnyCancellable>()
func run() {
// show a loader
let cancellable = runDownloads()
.receive(on: RunLoop.main)
.sink(receiveCompletion: { _ in }) { _ in
// hide the loader
}
cancellables.insert(cancellable)
}
func runDownloads() -> AnyPublisher<Bool, Error> {
let myURLs: [URL] = ...
return Future<Bool, Error> { promise in
let numberOfURLs = myURLS.count
var numberOfFinishedTasks = 0
for url in myURLs {
let cancellable = testRawDataTaskPublisher(for: url)
.sink(receiveCompletion: { _ in }) { data in
// save the data...
numberOfFinishedTasks += 1
if numberOfFinishedTasks >= numberOfURLs {
promise(.success(true))
}
}
cancellables.insert(cancellable)
}
}.eraseToAnyPublisher()
}
func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
...
}
}
通常我会使用 DispatchGroup
,启动多个 HTTP 任务并在任务完成时使用通知,但我想知道如何使用 Combine 以现代方式编写它。
您可以通过创建发布者集合、应用 flatMap
运算符然后 collect
等待所有发布者完成后再继续,从而 运行 一些并行操作。这是一个您可以在操场上 运行 的示例:
import Combine
import Foundation
func delayedPublisher<Value>(_ value: Value, delay after: Double) -> AnyPublisher<Value, Never> {
let p = PassthroughSubject<Value, Never>()
DispatchQueue.main.asyncAfter(deadline: .now() + after) {
p.send(value)
p.send(completion: .finished)
}
return p.eraseToAnyPublisher()
}
let myPublishers = [1,2,3]
.map{ delayedPublisher([=10=], delay: 1 / Double([=10=])).print("\([=10=])").eraseToAnyPublisher() }
let cancel = myPublishers
.publisher
.flatMap { [=10=] }
.collect()
.sink { result in
print("result:", result)
}
这是输出:
1: receive subscription: (PassthroughSubject)
1: request unlimited
2: receive subscription: (PassthroughSubject)
2: request unlimited
3: receive subscription: (PassthroughSubject)
3: request unlimited
3: receive value: (3)
3: receive finished
2: receive value: (2)
2: receive finished
1: receive value: (1)
1: receive finished
result: [3, 2, 1]
请注意,所有发布者都立即启动(按其原始顺序)。
1 / [=14=]
延迟导致第一个发布者需要最长的时间才能完成。请注意末尾值的顺序。由于第一个花费的时间最长,所以它是最后一个项目。
我正在探索 Combine。我编写了以 "combine" 方式发出 HTTP 请求的方法,例如:
func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
var request = URLRequest(url: url,
cachePolicy: .useProtocolCachePolicy,
timeoutInterval: 15)
request.httpMethod = "GET"
return urlSession.dataTaskPublisher(for: request)
.tryMap {
return [=10=].data
}
.eraseToAnyPublisher()
}
我想多次调用该方法,最后完成一个任务,例如:
let myURLs: [URL] = ...
for url in myURLs {
let cancellable = testRawDataTaskPublisher(for: url)
.sink(receiveCompletion: { _ in }) { data in
// save the data...
}
}
上面的代码不起作用,因为我必须将可取消项存储在属于 class 的变量中。
第一个问题是:将许多(例如 1000 个)可取消项存储在 Set<AnyCancellable>
之类的东西中是个好主意吗???不会造成内存泄漏吗?
var cancellables = Set<AnyCancellable>()
...
let cancellable = ...
cancellables.insert(cancellable) // ???
而第二个问题是:当所有的cancellable都完成后,如何开始一个task?我正在考虑类似的事情
class Test {
var cancellables = Set<AnyCancellable>()
func run() {
// show a loader
let cancellable = runDownloads()
.receive(on: RunLoop.main)
.sink(receiveCompletion: { _ in }) { _ in
// hide the loader
}
cancellables.insert(cancellable)
}
func runDownloads() -> AnyPublisher<Bool, Error> {
let myURLs: [URL] = ...
return Future<Bool, Error> { promise in
let numberOfURLs = myURLS.count
var numberOfFinishedTasks = 0
for url in myURLs {
let cancellable = testRawDataTaskPublisher(for: url)
.sink(receiveCompletion: { _ in }) { data in
// save the data...
numberOfFinishedTasks += 1
if numberOfFinishedTasks >= numberOfURLs {
promise(.success(true))
}
}
cancellables.insert(cancellable)
}
}.eraseToAnyPublisher()
}
func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
...
}
}
通常我会使用 DispatchGroup
,启动多个 HTTP 任务并在任务完成时使用通知,但我想知道如何使用 Combine 以现代方式编写它。
您可以通过创建发布者集合、应用 flatMap
运算符然后 collect
等待所有发布者完成后再继续,从而 运行 一些并行操作。这是一个您可以在操场上 运行 的示例:
import Combine
import Foundation
func delayedPublisher<Value>(_ value: Value, delay after: Double) -> AnyPublisher<Value, Never> {
let p = PassthroughSubject<Value, Never>()
DispatchQueue.main.asyncAfter(deadline: .now() + after) {
p.send(value)
p.send(completion: .finished)
}
return p.eraseToAnyPublisher()
}
let myPublishers = [1,2,3]
.map{ delayedPublisher([=10=], delay: 1 / Double([=10=])).print("\([=10=])").eraseToAnyPublisher() }
let cancel = myPublishers
.publisher
.flatMap { [=10=] }
.collect()
.sink { result in
print("result:", result)
}
这是输出:
1: receive subscription: (PassthroughSubject)
1: request unlimited
2: receive subscription: (PassthroughSubject)
2: request unlimited
3: receive subscription: (PassthroughSubject)
3: request unlimited
3: receive value: (3)
3: receive finished
2: receive value: (2)
2: receive finished
1: receive value: (1)
1: receive finished
result: [3, 2, 1]
请注意,所有发布者都立即启动(按其原始顺序)。
1 / [=14=]
延迟导致第一个发布者需要最长的时间才能完成。请注意末尾值的顺序。由于第一个花费的时间最长,所以它是最后一个项目。