RxJava/Android 监控在不同时间触发的多个订阅者的进度

RxJava/Android monitor progress of multiple subscribers fired at different times

我正在寻找一种方法,希望使用 RxJava 来保持一致性,以监视可能在不同时间触发的多个订阅者的进度。我知道如何将订阅者合并或 flatMap 在一起,当它们都被一种方法触发时,但我不知道当它们在不同时间被不同方法触发时如何做到这一点。

例如,如果我有 2 个长 运行 任务附加到按钮按下。我按下按钮 1 并启动 observable/subscriber,在 运行 进行到一半时,我按下按钮 2 启动第二个 observable/subscriber。

我想在没有任务 运行 时启用按钮,并在一个或多个任务 运行 时禁用它。

这可能吗?我也试图避免设置实例变量标志。

我会使用单独的 BehaviorSubjectscan 来监视执行状态。这与实例变量非常相似,但它可能会启发您找到更好的解决方案。像这样:

private final BehaviorSubject<Integer> mProgressSubject = BehaviorSubject.create(0);

public  Observable<String> firstLongRunningOperations() {
    return Observable.just("First")
            .doOnSubscribe(() -> mProgressSubject.onNext(1))
            .finallyDo(() -> mProgressSubject.onNext(-1)));
}

public  Observable<String> secondLongRunningOperations() {
    return Observable.just("Second")
            .doOnSubscribe(() -> mProgressSubject.onNext(1))
            .finallyDo(() -> mProgressSubject.onNext(-1));
}

public Observable<Boolean> isOperationInProgress() {
    return mProgressSubject.asObservable()
            .scan((sum, item) -> sum + item)
            .map(sum -> sum > 0);
}

用法是这样的:

isOperationInProgress()
        .subscribe(inProgress -> {
            if (inProgress) {
                //disable controls
            } else {
                //enable controls
            }
        });

使用这种方法,您可以进行任意数量的长 运行 操作,而不必将它们全部触发。只是不要忘记调用 doOnSubscribefinallyDo.

PS。抱歉,我没有测试,但应该可以。

为了使这成为可能,让两个长 运行 操作在 PublishSubject 上发出 onNext 事件。将两个主题与 zip 或 combineLatest 函数结合起来并订阅它。一旦 combine 函数接收到事件,这意味着两个 Subjects 都发出了一个 onNext 事件,因此两个长 运行 操作都已完成,您可以启用第三个按钮。

private PublishSubject<Boolean> firstSubject = PublishSubject.create();
private PublishSubject<Boolean> secondSubject = PublishSubject.create();

@Override
public void onStart() {
    super.onStart();
    subscribeToResult();
}

private Observable<Integer> firstOperation() { 
    return Observable.just(100)
                .delay(1000) // takes a while
                .subscribe(tick -> firstSubject.onNext(true));
}

private Observable<Integer> firstOperation() { 
    return Observable.just(200)
                .delay(1000) // takes a while
                .subscribe(tick -> secondSubject.onNext(true));
}

private void subscribeToResult() {
    Observable.zip(
        firstSubject,
        secondSubject,
        (firstResult, secondResult) -> return true
    ).subscribe(
        tick -> thirdButton.setEnabled(true)
    )
}

一定要看一看RxJava的combine函数。