从其他 Observable 获取价值的 Rx Observable

Rx Observable that gets value from other Observable

我是 RxSwift 和 MVVM 的新手。

我的 viewModel 有一个名为 rx_fetchItems(for:) 的方法,它负责从后端获取相关内容,returns Observable<[Item]>

我的目标是提供名为 collectionItems 的视图模型的可观察对象 属性,以及从 rx_fetchItems(for:) 返回的最后一个发出的元素,以便为我的 collectionView 提供数据。

Daniel T 提供了我可能会使用的解决方案:

protocol ServerAPI {
    func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
}

    struct ViewModel {

        let collectionItems: Observable<[Item]>
        let error: Observable<Error>

        init(controlValue: Observable<Int>, api: ServerAPI) {
            let serverItems = controlValue
                .map { ItemCategory(rawValue: [=10=]) }
                .filter { [=10=] != nil }.map { [=10=]! } // or use a `filterNil` operator if you already have one implemented.
                .flatMap { api.rx_fetchItems(for: [=10=])
                    .materialize()
                }
                .filter { [=10=].isCompleted == false }
                .shareReplayLatestWhileConnected()

            collectionItems = serverItems.filter { [=10=].element != nil }.dematerialize()
            error = serverItems.filter { [=10=].error != nil }.map { [=10=].error! }
        }

    }

这里唯一的问题是我当前的 ServerAPI aka FirebaseAPI 没有这样的协议方法,因为它是用一个单一的方法设计的,可以像这样触发所有请求:

class FirebaseAPI {

    private let session: URLSession

    init() {
        self.session = URLSession.shared
    }

    /// Responsible for Making actual API requests & Handling response
    /// Returns an observable object that conforms to JSONable protocol.
    /// Entities that confrom to JSONable just means they can be initialized with json.
    func rx_fireRequest<Entity: JSONable>(_ endpoint: FirebaseEndpoint, ofType _: Entity.Type ) -> Observable<[Entity]> {

        return Observable.create { [weak self] observer in
            self?.session.dataTask(with: endpoint.request, completionHandler: { (data, response, error) in

                /// Parse response from request.
                let parsedResponse = Parser(data: data, response: response, error: error)
                    .parse()

                switch parsedResponse {

                case .error(let error):
                    observer.onError(error)
                    return

                case .success(let data):

                    var entities = [Entity]()

                    switch endpoint.method {

                    /// Flatten JSON strucuture to retrieve a list of entities.
                    /// Denoted by 'GETALL' method.
                    case .GETALL:

                        /// Key (underscored) is unique identifier for each entity, which is not needed here.
                        /// value is k/v pairs of entity attributes.
                        for (_, value) in data {
                            if let value = value as? [String: AnyObject], let entity = Entity(json: value) {
                                entities.append(entity)
                            }
                        }

                        // Need to force downcast for generic type inference.
                        observer.onNext(entities as! [Entity])
                        observer.onCompleted()

                    /// All other methods return JSON that can be used to initialize JSONable entities 
                    default:
                        if let entity = Entity(json: data) {
                        observer.onNext([entity] as! [Entity])
                        observer.onCompleted()
                    } else {
                        observer.onError(NetworkError.initializationFailure)
                        }
                    }
                }
            }).resume()
            return Disposables.create()
        }
    }
}

rx_fireRequest 方法最重要的一点是它接受了一个 FirebaseEndpoint.

/// Conforms to Endpoint protocol in extension, so one of these enum members will be the input for FirebaseAPI's `fireRequest` method.

enum FirebaseEndpoint {

    case saveUser(data: [String: AnyObject])
    case fetchUser(id: String)
    case removeUser(id: String)

    case saveItem(data: [String: AnyObject])
    case fetchItem(id: String)
    case fetchItems
    case removeItem(id: String)

    case saveMessage(data: [String: AnyObject])
    case fetchMessages(chatroomId: String)
    case removeMessage(id: String)

}

为了使用 Daniel T 的解决方案,我必须将每个枚举案例从 FirebaseEndpoint 转换为 FirebaseAPI 中的方法。在每个方法中,调用 rx_fireRequest... 如果我是正确的。

我很想做出这个改变,如果它能带来更好的服务器 API 设计。所以简单的问题是,这种重构是否会改进我的整体 API 设计以及它与 ViewModels 的交互方式。我意识到这现在正在演变为代码审查。

另外...这是该协议方法及其助手的实现:

 func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>  {
        // fetched items returns all items in database as Observable<[Item]>
        let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)
        switch category {
        case .Local:
            let localItems = fetchedItems
            .flatMapLatest { [weak self] (itemList) -> Observable<[Item]> in
                return self!.rx_localItems(items: itemList)
            }

            return localItems

            // TODO: Handle other cases like RecentlyAdded, Trending, etc..
        }
    }

    // Helper method to filter items for only local items nearby user.
    private func rx_localItems(items: [Item]) -> Observable<[Item]> {
        return Observable.create { observable in
            observable.onNext(items.filter { [=13=].location == "LA" })
            observable.onCompleted()
            return Disposables.create()
        }
    }

如果我的 MVVM 或 RxSwift 或 API 设计方法有误,请批评指正。

你 运行 陷入了一个棘手的境地,因为你的 observable 可能会抛出错误,一旦它确实抛出错误,observable 序列就会出错,并且无法发出更多事件。因此,要处理后续的网络请求,您必须重新分配您当前采用的方法。但是,这通常不利于驱动 UI 元素,例如集合视图,因为您每次都必须绑定到重新分配的可观察对象。驱动 UI 元素时,您应该倾向于保证不会出错的类型(即 Variable 和 Driver)。您可以将 Observable<[Item]> 设置为 let items = Variable<[Item]>([]),然后您可以将该变量的值设置为来自新网络请求的项目数组。您可以使用 RxDataSources 或类似的东西安全地将此变量绑定到您的集合视图。然后你可以为错误消息创建一个单独的变量,比如 let errorMessage = Variable<String?>(nil),用于来自网络请求的错误消息,然后你可以将 errorMessage 字符串绑定到标签或类似的东西来显示你的错误消息.

我认为您遇到的问题是您只在可观察的范式中走了一半,这让您失望了。尝试一直坚持下去,看看是否有帮助。例如:

protocol ServerAPI {
    func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
}

struct ViewModel {

    let collectionItems: Observable<[Item]>
    let error: Observable<Error>

    init(controlValue: Observable<Int>, api: ServerAPI) {
        let serverItems = controlValue
            .map { ItemCategory(rawValue: [=10=]) }
            .filter { [=10=] != nil }.map { [=10=]! } // or use a `filterNil` operator if you already have one implemented.
            .flatMap { api.rx_fetchItems(for: [=10=])
                .materialize()
            }
            .filter { [=10=].isCompleted == false }
            .shareReplayLatestWhileConnected()

        collectionItems = serverItems.filter { [=10=].element != nil }.dematerialize()
        error = serverItems.filter { [=10=].error != nil }.map { [=10=].error! }
    }
}

编辑以处理评论中提到的问题。您现在需要传入具有 rx_fetchItems(for:) 方法的对象。您应该有多个这样的对象:一个指向服务器,一个不指向任何服务器,而是 returns 固定数据,以便您可以测试任何可能的响应,包括错误。 (视图模型不应该直接与服务器对话,而应该通过中介来对话...

上面的秘诀是 materialize 运算符,它将错误事件包装到包含错误对象的正常事件中。这样你就可以阻止网络错误关闭整个系统。


针对您问题的变化...您可以简单地使 FirebaseAPI 符合 ServerAPI:

extension FirebaseAPI: ServerAPI {
    func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>  {
        // fetched items returns all items in database as Observable<[Item]>
        let fetchedItems = self.rx_fireRequest(.fetchItems, ofType: Item.self)
        switch category {
        case .Local:
            let localItems = fetchedItems
                .flatMapLatest { [weak self] (itemList) -> Observable<[Item]> in
                    return self!.rx_localItems(items: itemList)
            }

            return localItems

            // TODO: Handle other cases like RecentlyAdded, Trending, etc..
        }
    }

    // Helper method to filter items for only local items nearby user.
    private func rx_localItems(items: [Item]) -> Observable<[Item]> {
        return Observable.create { observable in
            observable.onNext(items.filter { [=11=].location == "LA" })
            observable.onCompleted()
            return Disposables.create()
        }
    }
}

此时您可能应该将 ServerAPI 的名称更改为类似 FetchItemsAPI 的名称。

我知道开始理解 RxSwift 很困难

我喜欢使用 Subjects 或 Variables 作为 ViewModelObservables 或 Driver 的输入s 作为 ViewModel

的输出

通过这种方式,您可以将 ViewController 上发生的操作绑定到 ViewModel,在那里处理逻辑,并更新输出

这是重构代码的示例

查看模型

// Inputs
let didSelectItemCategory: PublishSubject<ItemCategory> = .init()

// Outputs
let items: Observable<[Item]>

init() {
    let client = FirebaseAPI()

    let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)

    self.items = didSelectItemCategory
        .withLatestFrom(fetchedItems, resultSelector: { itemCategory, fetchedItems in
            switch itemCategory {
            case .Local:
                return fetchedItems.filter { [=10=].location == "Los Angeles" }
            default: return []
            }
        })
}

ViewController

segmentedControl.rx.value
    .map(ItemCategory.init(rawValue:))
    .startWith(.Local)
    .bind(to: viewModel.didSelectItemCategory)
    .disposed(by: disposeBag)

viewModel.items
    .subscribe(onNext: { items in
        // Do something
    })
    .disposed(by: disposeBag)