如何更改 Observable.interval 的周期,以及如何在运行时停止和恢复 Observable 滴答

How can I change the period for Observable.interval, and how can I stop and resume the Observable ticks at runtime

有没有办法在 运行 时间更改 Observable.interval 时间段? 有没有办法停止和恢复 Observable.interval 滴答声? 有没有办法重置间隔时间?

实际上我正在使用以下代码在一段时间内永远执行一个操作,但在 运行 期间我无法控制它,我必须停止、恢复、休息和在 运行 时间更改周期。

Observable.interval(8, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.i("TAG", "onSubscribe");
                        }

                        @Override
                        public void onNext(Long aLong) {
                            myMethod();
                        }

                        @Override
                        public void onError(Throwable e) {
                            Log.i("TAG", "onError");
                        }

                        @Override
                        public void onComplete() {
                            Log.i("TAG", "onComplete");
                        }
                    });

我已经尝试 google 它来寻找解决方案,但不幸的是我没有找到任何解决方案,如果有的话,我需要帮助或资源。

我不是 100% 确定“我必须在运行时停止、恢复、休息和更改周期”是什么意思。 但是您可以在运行时使用新的周期处理订阅并重新初始化可观察对象:

    Disposable d = Observable.interval(1, TimeUnit.SECONDS)
            .subscribeOn(Schedulers.single())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {

                }
            });
    d.dispose();    

正如@DDH 所指出的,也许最简单的方法是取消正在进行的间隔并完全开始新的流程。

但是,如果出于某种原因必须将链保持在区间以下,则可以通过 switchMap 运算符切换到新的区间,例如由 PublishSubject 触发:

PublishSubject<Long> newInterval = PublishSubject.create();

newInterval.switchMap(currentPeriod ->
    Observable.interval(currentPeriod, TimeUnit.MILLISECONDS)
)
.doOnNext(v -> { /* background work */ })
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* ... */);

newInterval.onNext(1000L);

// sometime later

newInterval.onNext(200L);

这是我解决问题的方法

private PublishSubject<Long> newInterval;

// reactive programming using RXJava2
private void prepareObserver() {
    newInterval = PublishSubject.create();

    newInterval.switchMap(currentPeriod ->
            Observable.interval(currentPeriod, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.single())
    )
            .doOnNext(v -> runOnUiThread(this::pagerForeword))
            .subscribe();

    newInterval.onNext(period);
}

并重置我调用的间隔周期

newInterval.onNext(period);

您可以在以下资源教程中找到完整的解决方案 here

希望有用!