Async Observable 内部的 Async Observable

Async Observable inside Async Observable

我遇到了一个小问题,我正在使用 RxJava 从服务器检索数据,然后将这些数据与本地的数据进行核对,问题是我开始检查本地的文件并存储数据,然后我启动 observable我在数据完全下载之前获得成功,然后我在成功后收到 "initRemoteData" 结果。

所以我需要做的是在调用 "InitRemoteData" 时阻止可观察对象,当可观察对象给出我继续的结果时。

private Observable<CheckStatus> getCheckObservable() {
    return Observable.defer(() -> {
        DataDTO dto;
        try {
            dto = Utils.decryptData(
                localfile.getNonce(),
                localfile.getEncryptedData(),
                password);
        } catch (WrongPasswordException e) {
            return Observable.just(CheckStatus.WRONG_PASSWORD);
        }
        try {
            storeDataPrefs(dto);
        } catch (RuntimeException e) {
            return Observable.just(CheckStatus.OTHER_ERROR);
        }
        storeDatabase(dto);

        initRemoteData();//<-Here i did call for another observable but the observable keep going without waiting it to finish.



        return Observable.just(CheckStatus.SUCCESS);
    });
}

可观察调用:

getCheckObservable()
        .subscribeOn(BackgroundSchedulers.getMultiThreadInstance())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(Subscribers.create(checkResult -> replaceFragment(
            CheckAccountCompleteFragment
                .newInstance(checkResult),
            AddToBackStack.FALSE)));

更新: initRemoteData 代码:

Observable.zip(getObservable1()),//remote Data
            getObservable2(), // local Data
            (observableResult1,observableResult2)->{
                doSomethingWith(observableResult1,observableResult2); // compare between data
                return null;
            }).subscribeOn(BackgroundSchedulers.getMultiThreadInstance())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnCompleted(() -> {
                    doSomething2(); // update fields
                }
            })
            .subscribe();

如果我猜对了,您需要让 getCheckObservable() 等待两个结果?如果是:Observable.combineLatest()可以帮到你。

更新: 好吧,那么为什么不只是 return 这个 "waiting" Observable in "big" 一个 像这样:

Observable.zip(getObservable1()),//remote Data
        getObservable2(), // local Data
        (observableResult1,observableResult2)->{
            doSomethingWith(observableResult1,observableResult2); // compare between data

            return CheckStatus.SUCCESS;   // <-- CHANGES

        }).subscribeOn(BackgroundSchedulers.getMultiThreadInstance())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnCompleted(() -> {
                doSomething2(); // update fields
            }
        });

private Observable<CheckStatus> getCheckObservable() {
return Observable.defer(() -> {
    DataDTO dto;
    try {
        dto = Utils.decryptData(
            localfile.getNonce(),
            localfile.getEncryptedData(),
            password);
    } catch (WrongPasswordException e) {
        return Observable.just(CheckStatus.WRONG_PASSWORD);
    }
    try {
        storeDataPrefs(dto);
    } catch (RuntimeException e) {
        return Observable.just(CheckStatus.OTHER_ERROR);
    }
    storeDatabase(dto);

    return initRemoteData();       // <-- CHANGES
});
}