RxSwift 跟踪多个 observables activity
RxSwift track multiple observables activity
我有以下代码:
let categoriesRequest = APIService.MappableRequest(Category.self, resource: .categories)
let categoriesRequestResult = api.subscribeArrayRequest(categoriesRequest, from: actionRequest, disposedBy: disposeBag)
let newCategories = categoriesRequestResult
.map { [=11=].element }
.filterNil()
let categoriesUpdateData = DatabaseService.UpdateData(newObjectsObservable: newCategories)
let categoriesDatabaseResult = database.subscribeUpdates(from: categoriesUpdateData, disposedBy: disposeBag)
let latestTopicsRequest = APIService.MappableRequest(Topic.self, resource: .latestTopics)
let latestTopicsRequestResult = api.subscribeArrayRequest(latestTopicsRequest, from: actionRequest, disposedBy: disposeBag)
let newLastTopics = latestTopicsRequestResult
.map { [=11=].element }
.filterNil()
let latestTopicsUpdateData = DatabaseService.UpdateData(newObjectsObservable: newLastTopics)
let latestTopicsDatabaseResult = database.subscribeUpdates(from: latestTopicsUpdateData, disposedBy: disposeBag)
有两个请求从同一个发布主题 actionRequest
开始,并且在这些请求之后有两个数据库更新。
我需要 isActive
之类的布尔值,如果任何 api/database 任务正在进行,它将 return true
。可能吗?我在 RxSwift 示例中看到 ActivityIndicator,但我不知道是否可以在我的案例中使用它。
如果需要,来自 api/database 的代码:
// API
func subscribeArrayRequest<T>(_ request: MappableRequest<T>,
from triggerObservable: Observable<Void>,
disposedBy disposeBag: DisposeBag) -> Observable<Event<[T]>> {
let result = ReplaySubject<Event<[T]>>.create(bufferSize: 1)
triggerObservable
.flatMapLatest {
SessionManager
.jsonArrayObservable(with: request.urlRequest, isSecured: request.isSecured)
.mapResponse(on: APIService.mappingSheduler) { Mapper<T>().mapArray(JSONArray: [=12=]) }
.materialize()
}
.subscribe(onNext: { [weak result] event in
result?.onNext(event)
})
.disposed(by: disposeBag)
return result
}
// Database
func subscribeUpdates<N, P>(from data: UpdateData<N, P>, disposedBy disposeBag: DisposeBag) -> Observable<Void> {
let result = PublishSubject<Void>()
data.newObjectsObservable
.observeOn(DatabaseService.writingSheduler)
.subscribe(onNext: { [weak result] newObjects in
// update db
DispatchQueue.main.async {
result?.onNext(())
}
})
.disposed(by: disposeBag)
return result
}
谢谢。
找到以下解决方案:将 ActivityIndicator 实例传递给服务方法并像这样使用它:
func bindArrayRequest<T>(_ request: MappableRequest<T>,
from triggerObservable: Observable<Void>,
trackedBy indicator: RxActivityIndicator,
disposedBy disposeBag: DisposeBag) -> Observable<Event<[T]>> {
let result = ReplaySubject<Event<[T]>>.create(bufferSize: 1)
triggerObservable
.flatMapLatest {
SessionManager
.jsonArrayObservable(with: request.urlRequest, isSecured: request.isSecured)
.mapResponse(on: APIService.mappingSheduler) { Mapper<T>().mapArray(JSONArray: [=10=]) }
.materialize()
.trackActivity(indicator)
}
.bind(to: result)
.disposed(by: disposeBag)
return result
}
还将数据库更新逻辑包装到 observable 中,并将其与 .trackActivity(indicator)
一起使用到 flatMapLatest
中,就像在 api.
中一样
然后使用 api/database 方法,如下所示:
let indicator = RxActivityIndicator()
isUpdatingData = indicator.asObservable() // needed bool observable
let categoriesRequest = APIService.MappableRequest(Category.self, resource: .categories)
let categoriesRequestResult =
api.bindArrayRequest(categoriesRequest,
from: actionRequest,
trackedBy: indicator,
disposedBy: disposeBag)
let newCategories = categoriesRequestResult
.map { [=11=].element }
.filterNil()
let categoriesUpdateData = DatabaseService.UpdateData(newObjectsObservable: newCategories)
let categoriesDatabaseResult =
database.bindUpdates(from: categoriesUpdateData,
trackedBy: indicator,
disposedBy: disposeBag)
let latestTopicsRequest = APIService.MappableRequest(Topic.self, resource: .latestTopics)
let latestTopicsRequestResult =
api.bindArrayRequest(latestTopicsRequest,
from: actionRequest,
trackedBy: indicator,
disposedBy: disposeBag)
let newLastTopics = latestTopicsRequestResult
.map { [=11=].element }
.filterNil()
let latestTopicsUpdateData = DatabaseService.UpdateData(newObjectsObservable: newLastTopics)
let latestTopicsDatabaseResult =
database.bindUpdates(from: latestTopicsUpdateData,
trackedBy: indicator,
disposedBy: disposeBag)
isUpdatingData
observable 正是我所需要的。
我有以下代码:
let categoriesRequest = APIService.MappableRequest(Category.self, resource: .categories)
let categoriesRequestResult = api.subscribeArrayRequest(categoriesRequest, from: actionRequest, disposedBy: disposeBag)
let newCategories = categoriesRequestResult
.map { [=11=].element }
.filterNil()
let categoriesUpdateData = DatabaseService.UpdateData(newObjectsObservable: newCategories)
let categoriesDatabaseResult = database.subscribeUpdates(from: categoriesUpdateData, disposedBy: disposeBag)
let latestTopicsRequest = APIService.MappableRequest(Topic.self, resource: .latestTopics)
let latestTopicsRequestResult = api.subscribeArrayRequest(latestTopicsRequest, from: actionRequest, disposedBy: disposeBag)
let newLastTopics = latestTopicsRequestResult
.map { [=11=].element }
.filterNil()
let latestTopicsUpdateData = DatabaseService.UpdateData(newObjectsObservable: newLastTopics)
let latestTopicsDatabaseResult = database.subscribeUpdates(from: latestTopicsUpdateData, disposedBy: disposeBag)
有两个请求从同一个发布主题 actionRequest
开始,并且在这些请求之后有两个数据库更新。
我需要 isActive
之类的布尔值,如果任何 api/database 任务正在进行,它将 return true
。可能吗?我在 RxSwift 示例中看到 ActivityIndicator,但我不知道是否可以在我的案例中使用它。
如果需要,来自 api/database 的代码:
// API
func subscribeArrayRequest<T>(_ request: MappableRequest<T>,
from triggerObservable: Observable<Void>,
disposedBy disposeBag: DisposeBag) -> Observable<Event<[T]>> {
let result = ReplaySubject<Event<[T]>>.create(bufferSize: 1)
triggerObservable
.flatMapLatest {
SessionManager
.jsonArrayObservable(with: request.urlRequest, isSecured: request.isSecured)
.mapResponse(on: APIService.mappingSheduler) { Mapper<T>().mapArray(JSONArray: [=12=]) }
.materialize()
}
.subscribe(onNext: { [weak result] event in
result?.onNext(event)
})
.disposed(by: disposeBag)
return result
}
// Database
func subscribeUpdates<N, P>(from data: UpdateData<N, P>, disposedBy disposeBag: DisposeBag) -> Observable<Void> {
let result = PublishSubject<Void>()
data.newObjectsObservable
.observeOn(DatabaseService.writingSheduler)
.subscribe(onNext: { [weak result] newObjects in
// update db
DispatchQueue.main.async {
result?.onNext(())
}
})
.disposed(by: disposeBag)
return result
}
谢谢。
找到以下解决方案:将 ActivityIndicator 实例传递给服务方法并像这样使用它:
func bindArrayRequest<T>(_ request: MappableRequest<T>,
from triggerObservable: Observable<Void>,
trackedBy indicator: RxActivityIndicator,
disposedBy disposeBag: DisposeBag) -> Observable<Event<[T]>> {
let result = ReplaySubject<Event<[T]>>.create(bufferSize: 1)
triggerObservable
.flatMapLatest {
SessionManager
.jsonArrayObservable(with: request.urlRequest, isSecured: request.isSecured)
.mapResponse(on: APIService.mappingSheduler) { Mapper<T>().mapArray(JSONArray: [=10=]) }
.materialize()
.trackActivity(indicator)
}
.bind(to: result)
.disposed(by: disposeBag)
return result
}
还将数据库更新逻辑包装到 observable 中,并将其与 .trackActivity(indicator)
一起使用到 flatMapLatest
中,就像在 api.
然后使用 api/database 方法,如下所示:
let indicator = RxActivityIndicator()
isUpdatingData = indicator.asObservable() // needed bool observable
let categoriesRequest = APIService.MappableRequest(Category.self, resource: .categories)
let categoriesRequestResult =
api.bindArrayRequest(categoriesRequest,
from: actionRequest,
trackedBy: indicator,
disposedBy: disposeBag)
let newCategories = categoriesRequestResult
.map { [=11=].element }
.filterNil()
let categoriesUpdateData = DatabaseService.UpdateData(newObjectsObservable: newCategories)
let categoriesDatabaseResult =
database.bindUpdates(from: categoriesUpdateData,
trackedBy: indicator,
disposedBy: disposeBag)
let latestTopicsRequest = APIService.MappableRequest(Topic.self, resource: .latestTopics)
let latestTopicsRequestResult =
api.bindArrayRequest(latestTopicsRequest,
from: actionRequest,
trackedBy: indicator,
disposedBy: disposeBag)
let newLastTopics = latestTopicsRequestResult
.map { [=11=].element }
.filterNil()
let latestTopicsUpdateData = DatabaseService.UpdateData(newObjectsObservable: newLastTopics)
let latestTopicsDatabaseResult =
database.bindUpdates(from: latestTopicsUpdateData,
trackedBy: indicator,
disposedBy: disposeBag)
isUpdatingData
observable 正是我所需要的。