从网络加载,在 UI 上显示,并使用 RxJava 保存到磁盘

Loading from network, showing on UI, and save to disk with RxJava

我现在正在执行一项任务,该任务将从网络中获取数据,显示在 UI 上,并将数据保存到本地数据库以备后用。目前我正在做以下事情:

//Cloud loading
Observable<MyModel> loadDataFromCloud();

//Database
Observable<MyModel> saveDataToDisk(MyModel myModel);

//Load data
loadDataFromCloud()
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .flatMap(new Func1<MyModel, Observable<MyModel>>() {
    @Override
    public Observable<MyModel> call(MyModel myModel) {
      return saveDataToDisk(myModel);
    }
  })
  .subscribe(new Subscriber<MyModel>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
      //exception handling
    }

    @Override
    public void onNext(MyModel myModel) {
      //bind myModel to UI
    }
  });

这种方法工作正常,但由于数据模型变得更复杂,数据库变得更大,它不再有效,因为所有数据库事务现在都在主线程上执行并且 UI 需要等到交易完成后才有数据。

我现在改用这种新方法:

//A Singleton
Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());

//Load data
loadDataFromCloud()
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .flatMap(new Func1<MyModel, Observable<MyModel>>() {
    @Override
    public Observable<MyModel> call(MyModel myModel) {
      try {
        //bind myModel to UI
        return Observable.just(s);
      } catch(Exception e) {
        return Observable.error(e)
      }
    }
  })
  .observeOn(scheduler)
  .flatMap(new Func1<MyModel, Observable<MyModel>>() {
    @Override
    public Observable<MyModel> call(MyModel myModel) {
      return saveDataToDisk(myModel);
    }
  })
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<MyModel>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
      //exception handling
    }

    @Override
    public void onNext(MyModel myModel) {
      //do nothing
    }
  });

我正在使用 Schedulers.from(Executors.newSingleThreadExecutor()) 来确保所有数据库事务都是线程安全的。

我有一些问题:

  1. 使用Schedulers.from(Executors.newSingleThreadExecutor())确保所有数据库事务都是线程安全的是正确的方法吗?
  2. 有没有更好的方法来实现这个目标,因为上次onNext()什么都不做似乎是不明智的
  3. 使用 RxJava 我们可以在两个不同的线程上同时开始执行两个任务(显示在 UI 上并保存到磁盘)

我搜索了一段时间,找到了一些文章,例如 this one,但它们并没有直接解决我的问题。

首先,首先是在主线程上进行 IO 操作,这是一种不好的做法,在 android 中它可能会导致错误。

  1. 基本上它应该同步,但只要您在整个应用程序中使用相同的 Scheduler,否则您将为每个订阅此可观察对象创建新线程。但是,这样做有点奇怪,因为数据库操作通常是线程安全的。
  2. 在你的场景中,你想显示ui并且你可以并行saveDataToDisk()而不等待它,所以你可以使用doOnNext运算符来启动保存操作一个不同的可观察对象(你不关心它的结果)而不观察它的结果。

总而言之,您应该是这样的:

loadDataFromCloud()
   .doOnNext(data - > saveDataToDisk().subscribeOn(Schedulers.io()))
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(data -> //uodate your ui) 

Dan Lew post 是一个很好的例子,您应该遵循那里的例子。