RxJava 流程:条件运算符和错误处理

RxJava flow: conditional operators and error handling

我是 RxJava 的新手,并尝试围绕具有三个要处理的异步方法的更复杂的登录逻辑进行思考。对我来说这是 "if I get this thing converted to RxJava, anything (tm) is possible" :)

所以我想做的是:

Call A -> (Process A) -> Call B with results of A -> (Process B) -\
                    \                                              -> Combine and Subscribe
                     \-> Call C with results of A -> (Process C) -/

现在的问题是 Call C 分支只应在特定条件下执行,否则不能执行(Combine and Subscribe 然后可以从该分支接收 NULL 值,这是好的)。

此外,错误处理非常重要:虽然 Call ACall C(如果它运行的话)需要通过 onError 将错误传递给最终订阅者, Call B的"success"是可选的,如果失败可以忽略。

这是我到目前为止的想法,它仍然忽略了 "C" 分支:

 mApi.callA(someArgs)
            // a transition operator to apply thread schedulers
            .compose(applySchedulers())
            // from rxlifecycle-components, unsubscribes automatically 
            // when the activity goes down  
            .compose(bindToLifecycle())
            // possibly other transformations that should work on the (error)
            // states of the first and the following, chained API calls
            .flatMap(response -> processA(response))
            .flatMap(response -> mApi.callB(response.token))
            .flatMap(response -> processB(response))
            // redirects HTTP responses >= 300 to onError()
            .lift(new SequenceOperators.HttpErrorOperator<>())
            // checks for application-specific error payload and redirects that to onError()
            .lift(new SequenceOperators.ApiErrorOperator<>())
            .subscribe(this::allDone this::failure);

我环顾了 Wiki for conditional operators,但找不到如何启动 Call C 分支的提示。

此外,我不确定我的 SequenceOperators 是否以这种方式解决,即可以放在链中所有请求之后,或者我是否需要其中几个,每个都放在 flatMap() 之后触发新 Call 的运算符。

有人能指出我正确的方向吗?

谢谢!

您应该使用 Zip 运算符 :) 结果应该如下所示:

mApi.callA(someArgs)
        // ...
        .flatMap(response -> processA(response))
        .flatMap(response -> {
              return Observable.zip(
                    callB(response),
                    callC(response),
                    (rA,rB) -> {
                          // or just return a new Pair<>(rA, rB)
                          return combineAPlusB(rA,rB)
                    }
              )
        })
        // ...
        .subscribe(this::allDone this::failure);