如何连续执行多个 RxJava2 flux

How to execute many RxJava2 flux in a row

我正在自我介绍 RxJava2,但我觉得我做错了什么。就我而言,我想执行一些以下异步操作。

在这个例子中,第一个动作是检查设备是否已连接(wifi 或数据,让我们承认这需要时间),然后我想连接到 api 然后我想做获取列表(可观察)然后使用它的 http 调用。如果这些操作之一失败,则应在订阅中引发和处理 onError 或异常。

我有这个有效的代码:

Single.create((SingleEmitter<Boolean> e) -> e.onSuccess(Connectivity.isDeviceConnected(MainActivity.this)) )
    .subscribeOn(Schedulers.io())
    .flatMap(isDeviceConnected -> {
        Log.i("LOG", "isDeviceConnected : "+ isDeviceConnected);
        if(!isDeviceConnected)
            throw new Exception("whatever"); // TODO : Chercher vrai erreur

        return awRepository.getFluxAuthenticate(host, port, user, password); // Single<DisfeApiAirWatch>
    })
    .toObservable()
    .flatMap(awRepository::getFluxManagedApps)  // List of apps : Observable<AirwatchApp>

    .observeOn(AndroidSchedulers.mainThread())
    .doFinally(this::hideProgressDialog)
    .subscribe(
            app -> Log.i("LOG", "OnNext : "+ app),
            error -> Log.i("LOG", "Error : " + error),
            () -> Log.i("LOG", "Complete : ")
);

但是为一个简单的 "if" 发出布尔值的单身人士听起来是错误的。 Completable 似乎更合乎逻辑(工作与否,继续或停止)。我尝试使用以下代码,但它不起作用。

Completable.create((CompletableEmitter e) -> {
    if(Connectivity.isDeviceConnected(MainActivity.this))
        e.onComplete(); // Guess not good, should call the complete of subscribe ?
    else
        e.onError(new Exception("whatever"));
} ).toObservable()
    .subscribeOn(Schedulers.io())
    .flatMap(awRepository.getFluxAuthenticate(host, port, user, password)) //Single<DisfeApiAirWatch>
    .toObservable()
    .flatMap(awRepository::getFluxManagedApps) // List of apps : Observable<AirwatchApp>

    .observeOn(AndroidSchedulers.mainThread())
    .doFinally(this::hideProgressDialog)
    .subscribe(
            app -> Log.i("LOG", "OnNext : "+ app),
            error -> Log.i("LOG", "Error : " + error),
            () -> Log.i("LOG", "Complete : ")
);

如何使此代码生效?

我知道我可以先订阅 complatable,然后在 "onSuccess" 中编写另一个 flux / 其余代码。但我不认为堆栈在彼此内部流动是一个好的解决方案。

此致

Completable 没有值,因此永远不会调用 flatMap。您必须使用 andThen 并将身份验证成功值作为后续 flatMap:

的输入
Completable.create((CompletableEmitter e) -> {
    if(Connectivity.isDeviceConnected(MainActivity.this))
        e.onComplete();
    else
       e.onError(new Exception("whatever"));
})
.subscribeOn(Schedulers.io())
.andThen(awRepository.getFluxAuthenticate(host, port, user, password)) // <-----------
.flatMapObservable(awRepository::getFluxManagedApps)
.observeOn(AndroidSchedulers.mainThread())
.doFinally(this::hideProgressDialog)
.subscribe(
        app -> Log.i("LOG", "OnNext : "+ app),
        error -> Log.i("LOG", "Error : " + error),
        () -> Log.i("LOG", "Complete : ")

);