RxJava - 操作员是任务还是整个链是任务?

RxJava - Is an operator a task or the whole chain a task?

我正在编写一些代码以将记录插入 Sqlite 数据库(如果 table 为空)。在插入任何数据之前,它会调用 Web 服务 LoveToDo.basecampClient().fetchMe() 来 return 一些数据。

我使用 SqlBrite 进行数据库访问,使用 Retrofit 进行 Web 访问。这是我的代码:

    Observable.just(LoveToDo.briteDatabase())
        .map(new Func1<BriteDatabase, Integer>() {
            @Override
            public Integer call(BriteDatabase briteDatabase) {
                Cursor cursor = briteDatabase.query("SELECT * FROM Accounts");

                try {
                    return cursor.getCount();

                } finally {
                    cursor.close();
                }
            }
        })
        .flatMap(new Func1<Integer, Observable<Person>>() {
            @Override
            public Observable<Person> call(Integer count) {
                if ( count == 0 ) {
                    return LoveToDo.basecampClient().fetchMe();
                }

                return null;
            }
        })
        .map(new Func1<Person, Boolean>() {
            @Override
            public Boolean call(Person person) {
                if ( person == null ) return false;

                BriteDatabase database = LoveToDo.briteDatabase();

                long count = database.insert(Account.TABLE, new Account.Builder()
                    .accountId(Settings.accountId)
                    .userName(Settings.userName)
                    .password(Settings.password)
                    .agent(Settings.agent)
                    .personId(person.id)
                    .build()
                );

                return count > 0;
            }
        })

        .subscribeOn(Schedulers.io())
        .observeOn( Schedulers.io() )

        .subscribe();

不用说,我不认为这是很棒的代码。我想做的是找出如何将这段代码转换成好的东西。因此,让我们使用它并挑剔它的可怕之处。

首先,我是否应该将数据库和网络服务调用操作结合在一个操作员中。例如:

    Observable.just(LoveToDo.briteDatabase())
        .flatMap(new Func1<BriteDatabase, Observable<Person>>() {
            @Override
            public Observable<Person> call(BriteDatabase briteDatabase) {
                Cursor cursor = briteDatabase.query("SELECT * FROM Accounts");

                int count;
                try {
                    count = cursor.getCount();

                } finally {
                    cursor.close();
                }

                if ( count == 0 ) {
                    return LoveToDo.basecampClient().fetchMe();
                }

                return null;
            }
        })
        .map(new Func1<Person, Boolean>() {
            @Override
            public Boolean call(Person person) {
                if ( person == null ) return false;

                BriteDatabase database = LoveToDo.briteDatabase();

                long count = database.insert(Account.TABLE, new Account.Builder()
                        .accountId(Settings.accountId)
                        .userName(Settings.userName)
                        .password(Settings.password)
                        .agent(Settings.agent)
                        .personId(person.id)
                        .build()
                );

                return count > 0;
            }
        })

        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())

        .subscribe();

或者是否有充分的理由将此类操作隔离在链中?

第二个让我烦恼的事情是这是一个后台操作 - 没有用户界面会因为这段代码而直接更新。这就是为什么有一个无参数的 subscribe() 函数调用。但是当出现异常时会发生什么?这是否意味着我必须执行以下操作?

        .subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                // Do nothing
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                // Do something with the exception
            }
        });

顺便问一下,当 observeOn 设置为后台线程时,我需要 subscribeOn 吗?

第三,链是用一个SqlBrite观察器启动的。在链的后面,我再次需要 SqlBrite,所以我使用单例访问它 LoveToDo.briteDatabase()。这是一个坏主意吗?有更好的方法吗?

最后,有什么办法可以break;上链吗?如果我可以放弃我正在做的事情而不是在每一步检查丢失的数据,那就太好了

我看到很多问题。

  1. 每个 method/operator 都是一个 "task",它将 运行 基于前面的项目并将项目发送给下一个运算符。
  2. 为了减少代码的冗长,我们通常在 RxJava 中使用 Retrolambda or Gradle Retrolamda。如果你不想使用 Retolambda,你可以创建一个 class NameModel,它包含从 Observable 创建到 subscribe() 之前的所有逻辑。那里有所有需要的逻辑,孤立的。
  3. 如果你有一个网络调用,那么在订阅中总是有一个 onError Func 是个好主意,除非你在某个地方捕获了所有可能的错误,例如与 onErrorReturn。如果出现问题,onError 可以帮助您,例如通知用户。在订阅中而不是在链内更新某些内容也是一种很好的做法,从而隔离操作员的内容。
  4. subscribeOn 使进程处于后台,而不是 observeOn。所以不,如果您不更改线程,则不需要 observeOn
  5. "break" 链的最佳方法是抛出错误,或者更复杂的是使用自定义 .lift() 运算符和自定义订阅者从内部取消订阅链。

根据评论更新:

从第二个上面的 2 但我更喜欢这样的东西:

Observable.just(LoveToDo.briteDatabase())
        .flatMap(briteDatabase -> {
            Cursor cursor = briteDatabase.query("SELECT * FROM Accounts");

            int count;
            try {
                count = cursor.getCount();

            } finally {
                cursor.close();
            }

            if (count == 0) {
                return LoveToDo.basecampClient().fetchMe()
                        .map(person -> insertPerson(person, briteDatabase));
            }

            // if you want to track the account creation
            return just(false);
        })
        .subscribeOn(Schedulers.io())
        .subscribe(personInserted -> {
            // do something if the person was created or not
        }, e -> {
        });


private Boolean insertPerson(Person person, BriteDatabase briteDatabase) {
    long count = briteDatabase.insert(Account.TABLE, new Account.Builder()
            .accountId(Settings.accountId)
            .userName(Settings.userName)
            .password(Settings.password)
            .agent(Settings.agent)
            .personId(person.id)
            .build());

    return count > 0;
}