在RxAndroid(Observable)中添加subscription到disposable后,控制流结束,没有调用subscribe..!

After the subscription is added to disposable in RxAndroid (Observable), the control flow ends without calling subscribe..!

我正在尝试在 android 中使用 MVP、RX 和 Dagger 2。以下是代码流程,

LocalDataSource.java

@Singleton
public class LocalDataSource implements DataSource {

@Override
public Observable getServerSettings() {
return mDBHelper.createQuery(ServerSettingsEntry.TABLE_NAME,
DbUtils.getSelectAllQuery(ServerSettingsEntry.TABLE_NAME))
.mapToOne(DbUtils::getServerSettings);

Repository.java

@Singleton
public class MyRepository implements DataSource {
@Override
    public Observable<ServerSettings> getServerSettings() {
        return mLocalDataSource.getServerSettings().compose(RxUtils.applySchedulers());

LoginPresenter.java

public class LoginPresenter implements LoginActivityContract.Presenter{
 @Override
    public void checkServerDbSynced() {
        mCompositeDisposable.clear();
        Disposable subscription = mRepository
                .getServerSettings()
                .doOnSubscribe(disposable -> {
                    Timber.d(" onSubscribe");
                    mView.showLoadingIndicator(true, "Checking Server ....");
                })
                .subscribe(serverSettings -> {
                            if (serverSettings == null) {
                                Timber.d("*** Server Db Synced ****" + "\n" + "*** Checking Licence Key **** ");
                                checkLicenceKey();
                                } else {
                                Timber.d("*** Server Db Not Synced *** " + " \n" + "*** Opening Login Dialog ****");
                                mView.showLoadingIndicator(false, "Db Not Synced ....");
                                mView.openLoginDialog();
                                }
                                },
                        throwable -> {
                            mView.showErrorMessage(throwable.getLocalizedMessage());
                            });
        mCompositeDisposable.add(subscription);
    }
}



    @Override
    public void subscribe() {
        checkServerDbSynced();
    }

    @Override
    public void unSubscribe() {
        mCompositeDisposable.clear();
    }

问题是没有调用 subscribe 中的语句。调试时,我注意到在这一行之后 composite disposable.add(subscription);控制流程结束...请帮助...!!

编辑

public static ServerSettings getServerSettings(@NonNull Cursor cursor) {
ServerSettings s = new ServerSettings();
        s.setAndroidId("1234564453453463dfg");
                s.setDeviceId("tythyerju99");
                s.setIpAddress("6373792092.48949");
                s.setLicenceKey("fhfhdid");
                s.setExpiryDate("hshsh8ehd8");
                s.setId(2);
        Timber.d(" *** " +s.getExpiryDate()+" ****");
        return s;
    }

您的 doOnSubscribe 在执行 getServerSettings() 的同一线程上执行属于 Schedulers.io() 的内容。因此,您的 mView.showLoadingIndicator(true, "Checking Server ....") 正在尝试从主线程更新 UI(这是不允许的),因此此错误会终止您的 Observable 执行 - 您可能会在 [=16] 中看到它=] 打电话,但我不知道你到底在那里做什么。

您应该做的是从您的 doOnSubscribe 向主线程发送一条消息,如下所示:

...
            .doOnSubscribe(disposable -> {
                Timber.d(" onSubscribe");
                new Handler(Looper.getMainLooper()).post(() -> {
                    mView.showLoadingIndicator(true, "Checking Server ....");
                });
            })
...

更新

经过讨论,问题出在LocalDataSourcegetServerSettings方法上。具体来说,在 mapToOne 的用法中。当结果集为空时,就不会发出任何内容,这就是 subscribe 消费者方法未被调用的原因。为确保它不会发生,请使用 maptoOneOrDefault,如果结果集为空,它会发出指定的默认值。