RxJava - Observable,它发出新的或更改的项目

RxJava - Observable which emits items which are new or changed

我是反应世界的新手,正在尝试使用 rxjava/rxandroid 2.x.

实现以下场景

我在应用程序 class 中有一个本地数据集作为 ArrayList mItems。相同的数据集与服务器同步,并在用户每次打开应用程序时更新。但是在服务器 returns 响应之前,我想在适配器支持的 RecycleView 中显示本地数据集。 响应一返回,适配器就应该用增量更新列表,而不打乱 UI.

中的顺序

到目前为止我试过这个:

public Observable<List<Item>> getItemsObservable() {
    Observable<List<Item>> observeApi = itemServiceAPI.getItemsForUser(accountId);
    if (mItems != null) {
        return Observable.just(mItems).mergeWith(observeApi);
    } else {
        return observeApi;
    }
}

要更新UI,上面的方法是这样调用的:

Observable<List<Item>> itemsObservable = appContext.getItemsObservable();
            itemsObservable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new DefaultObserver<List<Item>>() {
                        @Override
                        public void onNext(List<Item> Items) {
                            // Code to update the adapter
                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onComplete() {

                        }
                    });

有了这个,我为每个本地数据集和远程数据集调用了两次 onNext。如何实现想要的功能?是否需要使用过滤运算符来排除项目​​?

实现此目标的最佳方法是什么?

更新

首先,你为什么要使用Observable.just(mItems)???没必要。

您的代码应该如下所示

itemServiceAPI.getItemsForUser(accountId)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new DefaultObserver<List<Item>>() {
                @Override
                public void onNext(List<Item> Items) {
                    // Code to update the adapter

                    mAdapter.updateItems(items);

                    /* method in adapter class
                    * 
                    *  public void updateItems(List<Item> mList) {
                              this.items.addAll(mList);
                              notifyDataSetChanged();
                          }
                    * */
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });

此处,您的适配器将在 onNext 中更新。请确保在调用 API 之前,您必须使用本地项目设置您的适配器。

您可以使用 'startWith' 运算符:它首先订阅不同的 observable。 appContext.getItemsObservable() .startWith(localCacheObservable) .subscribe(adapter::updateData) 适配器的更新数据应该处理差异计算。