从其他 Observable 获取价值的 Rx Observable
Rx Observable that gets value from other Observable
我是 RxSwift 和 MVVM 的新手。
我的 viewModel 有一个名为 rx_fetchItems(for:)
的方法,它负责从后端获取相关内容,returns Observable<[Item]>
。
我的目标是提供名为 collectionItems
的视图模型的可观察对象 属性,以及从 rx_fetchItems(for:)
返回的最后一个发出的元素,以便为我的 collectionView 提供数据。
Daniel T 提供了我可能会使用的解决方案:
protocol ServerAPI {
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
}
struct ViewModel {
let collectionItems: Observable<[Item]>
let error: Observable<Error>
init(controlValue: Observable<Int>, api: ServerAPI) {
let serverItems = controlValue
.map { ItemCategory(rawValue: [=10=]) }
.filter { [=10=] != nil }.map { [=10=]! } // or use a `filterNil` operator if you already have one implemented.
.flatMap { api.rx_fetchItems(for: [=10=])
.materialize()
}
.filter { [=10=].isCompleted == false }
.shareReplayLatestWhileConnected()
collectionItems = serverItems.filter { [=10=].element != nil }.dematerialize()
error = serverItems.filter { [=10=].error != nil }.map { [=10=].error! }
}
}
这里唯一的问题是我当前的 ServerAPI aka FirebaseAPI 没有这样的协议方法,因为它是用一个单一的方法设计的,可以像这样触发所有请求:
class FirebaseAPI {
private let session: URLSession
init() {
self.session = URLSession.shared
}
/// Responsible for Making actual API requests & Handling response
/// Returns an observable object that conforms to JSONable protocol.
/// Entities that confrom to JSONable just means they can be initialized with json.
func rx_fireRequest<Entity: JSONable>(_ endpoint: FirebaseEndpoint, ofType _: Entity.Type ) -> Observable<[Entity]> {
return Observable.create { [weak self] observer in
self?.session.dataTask(with: endpoint.request, completionHandler: { (data, response, error) in
/// Parse response from request.
let parsedResponse = Parser(data: data, response: response, error: error)
.parse()
switch parsedResponse {
case .error(let error):
observer.onError(error)
return
case .success(let data):
var entities = [Entity]()
switch endpoint.method {
/// Flatten JSON strucuture to retrieve a list of entities.
/// Denoted by 'GETALL' method.
case .GETALL:
/// Key (underscored) is unique identifier for each entity, which is not needed here.
/// value is k/v pairs of entity attributes.
for (_, value) in data {
if let value = value as? [String: AnyObject], let entity = Entity(json: value) {
entities.append(entity)
}
}
// Need to force downcast for generic type inference.
observer.onNext(entities as! [Entity])
observer.onCompleted()
/// All other methods return JSON that can be used to initialize JSONable entities
default:
if let entity = Entity(json: data) {
observer.onNext([entity] as! [Entity])
observer.onCompleted()
} else {
observer.onError(NetworkError.initializationFailure)
}
}
}
}).resume()
return Disposables.create()
}
}
}
rx_fireRequest
方法最重要的一点是它接受了一个 FirebaseEndpoint
.
/// Conforms to Endpoint protocol in extension, so one of these enum members will be the input for FirebaseAPI's `fireRequest` method.
enum FirebaseEndpoint {
case saveUser(data: [String: AnyObject])
case fetchUser(id: String)
case removeUser(id: String)
case saveItem(data: [String: AnyObject])
case fetchItem(id: String)
case fetchItems
case removeItem(id: String)
case saveMessage(data: [String: AnyObject])
case fetchMessages(chatroomId: String)
case removeMessage(id: String)
}
为了使用 Daniel T 的解决方案,我必须将每个枚举案例从 FirebaseEndpoint
转换为 FirebaseAPI
中的方法。在每个方法中,调用 rx_fireRequest
... 如果我是正确的。
我很想做出这个改变,如果它能带来更好的服务器 API 设计。所以简单的问题是,这种重构是否会改进我的整体 API 设计以及它与 ViewModels 的交互方式。我意识到这现在正在演变为代码审查。
另外...这是该协议方法及其助手的实现:
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]> {
// fetched items returns all items in database as Observable<[Item]>
let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)
switch category {
case .Local:
let localItems = fetchedItems
.flatMapLatest { [weak self] (itemList) -> Observable<[Item]> in
return self!.rx_localItems(items: itemList)
}
return localItems
// TODO: Handle other cases like RecentlyAdded, Trending, etc..
}
}
// Helper method to filter items for only local items nearby user.
private func rx_localItems(items: [Item]) -> Observable<[Item]> {
return Observable.create { observable in
observable.onNext(items.filter { [=13=].location == "LA" })
observable.onCompleted()
return Disposables.create()
}
}
如果我的 MVVM 或 RxSwift 或 API 设计方法有误,请批评指正。
你 运行 陷入了一个棘手的境地,因为你的 observable 可能会抛出错误,一旦它确实抛出错误,observable 序列就会出错,并且无法发出更多事件。因此,要处理后续的网络请求,您必须重新分配您当前采用的方法。但是,这通常不利于驱动 UI 元素,例如集合视图,因为您每次都必须绑定到重新分配的可观察对象。驱动 UI 元素时,您应该倾向于保证不会出错的类型(即 Variable 和 Driver)。您可以将 Observable<[Item]>
设置为 let items = Variable<[Item]>([])
,然后您可以将该变量的值设置为来自新网络请求的项目数组。您可以使用 RxDataSources 或类似的东西安全地将此变量绑定到您的集合视图。然后你可以为错误消息创建一个单独的变量,比如 let errorMessage = Variable<String?>(nil)
,用于来自网络请求的错误消息,然后你可以将 errorMessage 字符串绑定到标签或类似的东西来显示你的错误消息.
我认为您遇到的问题是您只在可观察的范式中走了一半,这让您失望了。尝试一直坚持下去,看看是否有帮助。例如:
protocol ServerAPI {
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
}
struct ViewModel {
let collectionItems: Observable<[Item]>
let error: Observable<Error>
init(controlValue: Observable<Int>, api: ServerAPI) {
let serverItems = controlValue
.map { ItemCategory(rawValue: [=10=]) }
.filter { [=10=] != nil }.map { [=10=]! } // or use a `filterNil` operator if you already have one implemented.
.flatMap { api.rx_fetchItems(for: [=10=])
.materialize()
}
.filter { [=10=].isCompleted == false }
.shareReplayLatestWhileConnected()
collectionItems = serverItems.filter { [=10=].element != nil }.dematerialize()
error = serverItems.filter { [=10=].error != nil }.map { [=10=].error! }
}
}
编辑以处理评论中提到的问题。您现在需要传入具有 rx_fetchItems(for:)
方法的对象。您应该有多个这样的对象:一个指向服务器,一个不指向任何服务器,而是 returns 固定数据,以便您可以测试任何可能的响应,包括错误。 (视图模型不应该直接与服务器对话,而应该通过中介来对话...
上面的秘诀是 materialize
运算符,它将错误事件包装到包含错误对象的正常事件中。这样你就可以阻止网络错误关闭整个系统。
针对您问题的变化...您可以简单地使 FirebaseAPI 符合 ServerAPI:
extension FirebaseAPI: ServerAPI {
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]> {
// fetched items returns all items in database as Observable<[Item]>
let fetchedItems = self.rx_fireRequest(.fetchItems, ofType: Item.self)
switch category {
case .Local:
let localItems = fetchedItems
.flatMapLatest { [weak self] (itemList) -> Observable<[Item]> in
return self!.rx_localItems(items: itemList)
}
return localItems
// TODO: Handle other cases like RecentlyAdded, Trending, etc..
}
}
// Helper method to filter items for only local items nearby user.
private func rx_localItems(items: [Item]) -> Observable<[Item]> {
return Observable.create { observable in
observable.onNext(items.filter { [=11=].location == "LA" })
observable.onCompleted()
return Disposables.create()
}
}
}
此时您可能应该将 ServerAPI
的名称更改为类似 FetchItemsAPI
的名称。
我知道开始理解 RxSwift 很困难
我喜欢使用 Subject
s 或 Variable
s 作为 ViewModel 和 Observable
s 或 Driver
的输入s 作为 ViewModel
的输出
通过这种方式,您可以将 ViewController 上发生的操作绑定到 ViewModel,在那里处理逻辑,并更新输出
这是重构代码的示例
查看模型
// Inputs
let didSelectItemCategory: PublishSubject<ItemCategory> = .init()
// Outputs
let items: Observable<[Item]>
init() {
let client = FirebaseAPI()
let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)
self.items = didSelectItemCategory
.withLatestFrom(fetchedItems, resultSelector: { itemCategory, fetchedItems in
switch itemCategory {
case .Local:
return fetchedItems.filter { [=10=].location == "Los Angeles" }
default: return []
}
})
}
ViewController
segmentedControl.rx.value
.map(ItemCategory.init(rawValue:))
.startWith(.Local)
.bind(to: viewModel.didSelectItemCategory)
.disposed(by: disposeBag)
viewModel.items
.subscribe(onNext: { items in
// Do something
})
.disposed(by: disposeBag)
我是 RxSwift 和 MVVM 的新手。
我的 viewModel 有一个名为 rx_fetchItems(for:)
的方法,它负责从后端获取相关内容,returns Observable<[Item]>
。
我的目标是提供名为 collectionItems
的视图模型的可观察对象 属性,以及从 rx_fetchItems(for:)
返回的最后一个发出的元素,以便为我的 collectionView 提供数据。
Daniel T 提供了我可能会使用的解决方案:
protocol ServerAPI {
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
}
struct ViewModel {
let collectionItems: Observable<[Item]>
let error: Observable<Error>
init(controlValue: Observable<Int>, api: ServerAPI) {
let serverItems = controlValue
.map { ItemCategory(rawValue: [=10=]) }
.filter { [=10=] != nil }.map { [=10=]! } // or use a `filterNil` operator if you already have one implemented.
.flatMap { api.rx_fetchItems(for: [=10=])
.materialize()
}
.filter { [=10=].isCompleted == false }
.shareReplayLatestWhileConnected()
collectionItems = serverItems.filter { [=10=].element != nil }.dematerialize()
error = serverItems.filter { [=10=].error != nil }.map { [=10=].error! }
}
}
这里唯一的问题是我当前的 ServerAPI aka FirebaseAPI 没有这样的协议方法,因为它是用一个单一的方法设计的,可以像这样触发所有请求:
class FirebaseAPI {
private let session: URLSession
init() {
self.session = URLSession.shared
}
/// Responsible for Making actual API requests & Handling response
/// Returns an observable object that conforms to JSONable protocol.
/// Entities that confrom to JSONable just means they can be initialized with json.
func rx_fireRequest<Entity: JSONable>(_ endpoint: FirebaseEndpoint, ofType _: Entity.Type ) -> Observable<[Entity]> {
return Observable.create { [weak self] observer in
self?.session.dataTask(with: endpoint.request, completionHandler: { (data, response, error) in
/// Parse response from request.
let parsedResponse = Parser(data: data, response: response, error: error)
.parse()
switch parsedResponse {
case .error(let error):
observer.onError(error)
return
case .success(let data):
var entities = [Entity]()
switch endpoint.method {
/// Flatten JSON strucuture to retrieve a list of entities.
/// Denoted by 'GETALL' method.
case .GETALL:
/// Key (underscored) is unique identifier for each entity, which is not needed here.
/// value is k/v pairs of entity attributes.
for (_, value) in data {
if let value = value as? [String: AnyObject], let entity = Entity(json: value) {
entities.append(entity)
}
}
// Need to force downcast for generic type inference.
observer.onNext(entities as! [Entity])
observer.onCompleted()
/// All other methods return JSON that can be used to initialize JSONable entities
default:
if let entity = Entity(json: data) {
observer.onNext([entity] as! [Entity])
observer.onCompleted()
} else {
observer.onError(NetworkError.initializationFailure)
}
}
}
}).resume()
return Disposables.create()
}
}
}
rx_fireRequest
方法最重要的一点是它接受了一个 FirebaseEndpoint
.
/// Conforms to Endpoint protocol in extension, so one of these enum members will be the input for FirebaseAPI's `fireRequest` method.
enum FirebaseEndpoint {
case saveUser(data: [String: AnyObject])
case fetchUser(id: String)
case removeUser(id: String)
case saveItem(data: [String: AnyObject])
case fetchItem(id: String)
case fetchItems
case removeItem(id: String)
case saveMessage(data: [String: AnyObject])
case fetchMessages(chatroomId: String)
case removeMessage(id: String)
}
为了使用 Daniel T 的解决方案,我必须将每个枚举案例从 FirebaseEndpoint
转换为 FirebaseAPI
中的方法。在每个方法中,调用 rx_fireRequest
... 如果我是正确的。
我很想做出这个改变,如果它能带来更好的服务器 API 设计。所以简单的问题是,这种重构是否会改进我的整体 API 设计以及它与 ViewModels 的交互方式。我意识到这现在正在演变为代码审查。
另外...这是该协议方法及其助手的实现:
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]> {
// fetched items returns all items in database as Observable<[Item]>
let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)
switch category {
case .Local:
let localItems = fetchedItems
.flatMapLatest { [weak self] (itemList) -> Observable<[Item]> in
return self!.rx_localItems(items: itemList)
}
return localItems
// TODO: Handle other cases like RecentlyAdded, Trending, etc..
}
}
// Helper method to filter items for only local items nearby user.
private func rx_localItems(items: [Item]) -> Observable<[Item]> {
return Observable.create { observable in
observable.onNext(items.filter { [=13=].location == "LA" })
observable.onCompleted()
return Disposables.create()
}
}
如果我的 MVVM 或 RxSwift 或 API 设计方法有误,请批评指正。
你 运行 陷入了一个棘手的境地,因为你的 observable 可能会抛出错误,一旦它确实抛出错误,observable 序列就会出错,并且无法发出更多事件。因此,要处理后续的网络请求,您必须重新分配您当前采用的方法。但是,这通常不利于驱动 UI 元素,例如集合视图,因为您每次都必须绑定到重新分配的可观察对象。驱动 UI 元素时,您应该倾向于保证不会出错的类型(即 Variable 和 Driver)。您可以将 Observable<[Item]>
设置为 let items = Variable<[Item]>([])
,然后您可以将该变量的值设置为来自新网络请求的项目数组。您可以使用 RxDataSources 或类似的东西安全地将此变量绑定到您的集合视图。然后你可以为错误消息创建一个单独的变量,比如 let errorMessage = Variable<String?>(nil)
,用于来自网络请求的错误消息,然后你可以将 errorMessage 字符串绑定到标签或类似的东西来显示你的错误消息.
我认为您遇到的问题是您只在可观察的范式中走了一半,这让您失望了。尝试一直坚持下去,看看是否有帮助。例如:
protocol ServerAPI {
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
}
struct ViewModel {
let collectionItems: Observable<[Item]>
let error: Observable<Error>
init(controlValue: Observable<Int>, api: ServerAPI) {
let serverItems = controlValue
.map { ItemCategory(rawValue: [=10=]) }
.filter { [=10=] != nil }.map { [=10=]! } // or use a `filterNil` operator if you already have one implemented.
.flatMap { api.rx_fetchItems(for: [=10=])
.materialize()
}
.filter { [=10=].isCompleted == false }
.shareReplayLatestWhileConnected()
collectionItems = serverItems.filter { [=10=].element != nil }.dematerialize()
error = serverItems.filter { [=10=].error != nil }.map { [=10=].error! }
}
}
编辑以处理评论中提到的问题。您现在需要传入具有 rx_fetchItems(for:)
方法的对象。您应该有多个这样的对象:一个指向服务器,一个不指向任何服务器,而是 returns 固定数据,以便您可以测试任何可能的响应,包括错误。 (视图模型不应该直接与服务器对话,而应该通过中介来对话...
上面的秘诀是 materialize
运算符,它将错误事件包装到包含错误对象的正常事件中。这样你就可以阻止网络错误关闭整个系统。
针对您问题的变化...您可以简单地使 FirebaseAPI 符合 ServerAPI:
extension FirebaseAPI: ServerAPI {
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]> {
// fetched items returns all items in database as Observable<[Item]>
let fetchedItems = self.rx_fireRequest(.fetchItems, ofType: Item.self)
switch category {
case .Local:
let localItems = fetchedItems
.flatMapLatest { [weak self] (itemList) -> Observable<[Item]> in
return self!.rx_localItems(items: itemList)
}
return localItems
// TODO: Handle other cases like RecentlyAdded, Trending, etc..
}
}
// Helper method to filter items for only local items nearby user.
private func rx_localItems(items: [Item]) -> Observable<[Item]> {
return Observable.create { observable in
observable.onNext(items.filter { [=11=].location == "LA" })
observable.onCompleted()
return Disposables.create()
}
}
}
此时您可能应该将 ServerAPI
的名称更改为类似 FetchItemsAPI
的名称。
我知道开始理解 RxSwift 很困难
我喜欢使用 Subject
s 或 Variable
s 作为 ViewModel 和 Observable
s 或 Driver
的输入s 作为 ViewModel
通过这种方式,您可以将 ViewController 上发生的操作绑定到 ViewModel,在那里处理逻辑,并更新输出
这是重构代码的示例
查看模型
// Inputs
let didSelectItemCategory: PublishSubject<ItemCategory> = .init()
// Outputs
let items: Observable<[Item]>
init() {
let client = FirebaseAPI()
let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)
self.items = didSelectItemCategory
.withLatestFrom(fetchedItems, resultSelector: { itemCategory, fetchedItems in
switch itemCategory {
case .Local:
return fetchedItems.filter { [=10=].location == "Los Angeles" }
default: return []
}
})
}
ViewController
segmentedControl.rx.value
.map(ItemCategory.init(rawValue:))
.startWith(.Local)
.bind(to: viewModel.didSelectItemCategory)
.disposed(by: disposeBag)
viewModel.items
.subscribe(onNext: { items in
// Do something
})
.disposed(by: disposeBag)