Interval-scheduled RxJava observables 花费比指定时间更多的时间

Interval-scheduled RxJava observables take more time than specified

在下面的代码中,observable 应该每 300 毫秒触发一次。我通过模拟需要 1 秒的背景 activity 来增加它的趣味性。我期望因为我使用的调度程序在下面使用线程池,所以可观察到的间隔将每 300 毫秒持续触发一个新线程。取而代之的是,interval observable 每次都等待整整一秒,然后再次触发。这是期望的行为吗?如果一项任务花费的时间超过请求的时间,如何强制它并行启动?

代码如下:

Observable
            .interval(300, TimeUnit.MILLISECONDS, Schedulers.io())
                .doOnNext(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        System.out.println("action thread: " + Thread.currentThread());
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                })
                .map(new Func1<Long, Float>() {
                    @Override
                    public Float call(Long aLong) {
                        final double result = Math.random();
                        return new Float(result);
                    }
                })
                .takeWhile(new Func1<Float, Boolean>() {
                    @Override
                    public Boolean call(Float aFloat) {
                        return aFloat >= 0.01f;
                    }
                })
            .subscribe(new Action1<Float>() {
                @Override
                public void call(Float aFloat) {
                    System.out.println("observing thread: " + Thread.currentThread());
                    System.out.println(aFloat);
                }
            });

Observables 本质上是连续的,所以如果你像在例子中那样进入睡眠状态,你就会阻塞整个序列。要进行后台计算,您必须通过 observeOnsubscribeOn 将其移动到另一个线程。在这种情况下,您可以 flatMap/concatMapEager 在另一个执行休眠的可观察对象中并将结果合并回主序列:

Observable.interval(300, TimeUnit.MILLISECONDS)
.flatMap(t -> Observable.fromCallable(() -> { 
        Thread.sleep(1000);
     }).subscribeOn(Schedulers.io()))
.map(...)
.takeWhile(...)
.subscribe(...)