Interval-scheduled RxJava observables 花费比指定时间更多的时间
Interval-scheduled RxJava observables take more time than specified
在下面的代码中,observable 应该每 300 毫秒触发一次。我通过模拟需要 1 秒的背景 activity 来增加它的趣味性。我期望因为我使用的调度程序在下面使用线程池,所以可观察到的间隔将每 300 毫秒持续触发一个新线程。取而代之的是,interval observable 每次都等待整整一秒,然后再次触发。这是期望的行为吗?如果一项任务花费的时间超过请求的时间,如何强制它并行启动?
代码如下:
Observable
.interval(300, TimeUnit.MILLISECONDS, Schedulers.io())
.doOnNext(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("action thread: " + Thread.currentThread());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
})
.map(new Func1<Long, Float>() {
@Override
public Float call(Long aLong) {
final double result = Math.random();
return new Float(result);
}
})
.takeWhile(new Func1<Float, Boolean>() {
@Override
public Boolean call(Float aFloat) {
return aFloat >= 0.01f;
}
})
.subscribe(new Action1<Float>() {
@Override
public void call(Float aFloat) {
System.out.println("observing thread: " + Thread.currentThread());
System.out.println(aFloat);
}
});
Observables 本质上是连续的,所以如果你像在例子中那样进入睡眠状态,你就会阻塞整个序列。要进行后台计算,您必须通过 observeOn
或 subscribeOn
将其移动到另一个线程。在这种情况下,您可以 flatMap
/concatMapEager
在另一个执行休眠的可观察对象中并将结果合并回主序列:
Observable.interval(300, TimeUnit.MILLISECONDS)
.flatMap(t -> Observable.fromCallable(() -> {
Thread.sleep(1000);
}).subscribeOn(Schedulers.io()))
.map(...)
.takeWhile(...)
.subscribe(...)
在下面的代码中,observable 应该每 300 毫秒触发一次。我通过模拟需要 1 秒的背景 activity 来增加它的趣味性。我期望因为我使用的调度程序在下面使用线程池,所以可观察到的间隔将每 300 毫秒持续触发一个新线程。取而代之的是,interval observable 每次都等待整整一秒,然后再次触发。这是期望的行为吗?如果一项任务花费的时间超过请求的时间,如何强制它并行启动?
代码如下:
Observable
.interval(300, TimeUnit.MILLISECONDS, Schedulers.io())
.doOnNext(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("action thread: " + Thread.currentThread());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
})
.map(new Func1<Long, Float>() {
@Override
public Float call(Long aLong) {
final double result = Math.random();
return new Float(result);
}
})
.takeWhile(new Func1<Float, Boolean>() {
@Override
public Boolean call(Float aFloat) {
return aFloat >= 0.01f;
}
})
.subscribe(new Action1<Float>() {
@Override
public void call(Float aFloat) {
System.out.println("observing thread: " + Thread.currentThread());
System.out.println(aFloat);
}
});
Observables 本质上是连续的,所以如果你像在例子中那样进入睡眠状态,你就会阻塞整个序列。要进行后台计算,您必须通过 observeOn
或 subscribeOn
将其移动到另一个线程。在这种情况下,您可以 flatMap
/concatMapEager
在另一个执行休眠的可观察对象中并将结果合并回主序列:
Observable.interval(300, TimeUnit.MILLISECONDS)
.flatMap(t -> Observable.fromCallable(() -> {
Thread.sleep(1000);
}).subscribeOn(Schedulers.io()))
.map(...)
.takeWhile(...)
.subscribe(...)