如何组合两个几乎相似的 RxJava 方法?

How to combine two almost similar RxJava methods?

我有两个 RxJava 风格的方法:

void requestFirst() {
    Observable
        .combineLatest(
            api.requestAPI1(),
            api.requestAPI2(),
            (result1, result2) -> result1)
        .map(result -> processResult(result))
        .observeOn(AndroidSchedulers.mainThread())
        .doOnSubscribe(etc1())
        .doOnSubscribe(etc2())
        .subscribe(
            result -> onComplete(result),
            e -> onError(e));
}

void requestNext() {
    api.requestAPI1()
        .map(result -> processResult(result))
        .observeOn(AndroidSchedulers.mainThread())
        .doOnSubscribe(etc1())
        .doOnSubscribe(etc2())
        .subscribe(
            result -> onComplete(result),
            e -> onError(e));
}

除了combineLatest,两种方法几乎相同, 我想合并它进行重构。 所以我写了下面的代码:

void request(boolean isFirst) {
    Observable
        .combineLatest(
            api.requestAPI1(),
            (isFirst) ? api.requestAPI2() : Observable.empty(),
            (result1, result2) -> result1)
        .map(result -> processResult(result))
        .observeOn(AndroidSchedulers.mainThread())
        .doOnSubscribe(etc1())
        .doOnSubscribe(etc2())
        .subscribe(
            result -> onComplete(result),
            e -> onError(e));
}

但是我认为combineLatest方法的第二个参数不是Rx的风格。 我如何通过 Rx 的风格合并这两个相似的方法? 谢谢。

您可以添加一个新方法:

void processResults(Observable<Source> dataSource) {
    results.map(result -> processResult(result))
    .observeOn(AndroidSchedulers.mainThread())
    .doOnSubscribe(etc1())
    .doOnSubscribe(etc2())
    .subscribe(
        result -> onComplete(result),
        e -> onError(e));
}

并将其用作:

void requestNext() {
  processResults(api.requestAPI1());
}

使用运算符.merge()

final Observable tworequestsObservable = Observable
    .combineLatest(
        api.requestAPI1(),
        api.requestAPI2(),
        (result1, result2) -> result1)

void requestNext() {
    Observable.merge(api.requestAPI1(), tworequestsObservable)
        .map(result -> processResult(result))
        .observeOn(AndroidSchedulers.mainThread())
        .doOnSubscribe(etc1())
        .doOnSubscribe(etc2())
        .subscribe(
            result -> onComplete(result),
            e -> onError(e));
}

并且您可以在 merge 之后添加 .distinctUntilChange() 以避免双重处理 api.requestAPI1() 结果。