RxJava2 由于分支逻辑而丢失结果

RxJava2 losing result due to branching logic

我有某种从服务器获得的结果。我正在尝试构建一个可观察的序列,它将: 1.检查结果是否成功(result.success()) 2. 如果是,则从结果中获取字符串资源 (result.getResource()) 和 return 具有该字符串的可观察对象 3. 如果不是,则执行一些故障处理代码和 return 一个空的 observable

我可以通过一个简单的 switchMap 轻松实现这一点,但是它包含我不喜欢的副作用(故障处理代码),因为我想尽可能将不同的操作分开。理想情况下,我想在单独的使用者中进行故障处理,但问题是我无法在不丢失 Result 对象的情况下传递 .success() 方法的结果。有没有办法实现我的需要?

现在,我的代码看起来像这样:

Observable<String> getResource() {
            return getServerResult()
            .switchMap(new Function<Result, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Result result) throws Exception {
                    if (result.success()) {
                        return Observable.just(result.getResource());
                    } else {
                        onFailure(result); // not happy about this side effect being here
                        return Observable.empty();
                    }
                }
            });

您可以将结果存储在自定义 Exception 中,将其包装到流错误中,如果结果不成功,则 return 在 flatMap 中调用副作用使用 doOnError 的方法并将其转换回 Observable in ErrorResumeNext:

class MyCustomException extends Exception {
    Result result;
}

//do you really need switchMap?
.flatMap(result -> {
    if (result.success()) {
        return Observable.just(result.getResource());
    } else {
        return Observable.error(MyCustomException(result));
    }
})
//do* operators is meant to call side effect methods without changing the stream
.doOnError(exception -> { 
    if(exception instanceof MyCustomException) {
        Result result = ((MyCustomException) exception).getResult();
        onFailure(result);
    }
})
.onErrorResumeNext(exception -> {
    if(exception instanceof MyCustomException) {
        return Observable.empty();
    } else {
        return Observable.error(exception); //propagate error downstream
    }
})