RxJava。 Observable.delay 工作奇怪(最后缺少一些项目)
RxJava. Observable.delay work strange (lacks some items at the end)
我正在尝试了解 RxJava。我的测试代码是:
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import java.util.concurrent.TimeUnit;
public class Hello {
public static void main(String[] args) {
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
Thread.sleep(1000);
subscriber.onNext("a");
Thread.sleep(1000);
subscriber.onNext("b");
Thread.sleep(1000);
subscriber.onNext("c");
Thread.sleep(1000);
subscriber.onNext("d");
Thread.sleep(1000);
subscriber.onNext("e");
Thread.sleep(1000);
subscriber.onNext("f");
Thread.sleep(1000);
subscriber.onNext("g");
Thread.sleep(1000);
subscriber.onNext("h");
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
});
observable
.delay(2, TimeUnit.SECONDS)
.subscribe(new Action1<String>() {
@Override
public void call(String string) {
System.out.println(string);
}
});
}
}
没有.delay(2, TimeUnit.SECONDS)
我有输出:a
b
C
d
电子
F
G
H
但是 .delay(2, TimeUnit.SECONDS)
输出缺少 "g" 和 "h":
一种
b
C
d
电子
f
怎么可能?文档说 delay 只是发出由源 Observable 发出的项目,按指定的 delay
及时向前移动
您正在使用的 delay
重载计划在不同的线程上工作,并导致隐式竞争 condition.All 时间运算符(例如 delay
、buffer
、和 window
) 需要使用调度程序来安排稍后的效果,如果您没有意识到并小心使用它们,这可能会导致意外的竞争条件。在这种情况下,延迟运算符将下游工作安排在单独的线程池上。这是您测试中的执行顺序(在主线程上)。
- 您的 Observable 已订阅并在
onNext("a")
之前等待 1000 毫秒
- 接下来是延迟接收。这会在 2 秒后安排下游 onNext。
- 控制流 returns 立即到等待 1000 毫秒的可观察对象。
- 可观察到
onNext("b")
延迟。 Delay 将 "b" 的 onNext 安排在 2 秒后。
- .....(重复)
- 当您的 observable 调用
onNext("h")
时,它会安排工作,然后立即 returns 订阅并终止您的测试(导致安排的工作消失)。
为了让它异步执行,您可以在 trampoline 调度程序实现上安排延迟。
.delay(2, TimeUnit.SECONDS, Schedulers.trampoline())
我正在尝试了解 RxJava。我的测试代码是:
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import java.util.concurrent.TimeUnit;
public class Hello {
public static void main(String[] args) {
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
Thread.sleep(1000);
subscriber.onNext("a");
Thread.sleep(1000);
subscriber.onNext("b");
Thread.sleep(1000);
subscriber.onNext("c");
Thread.sleep(1000);
subscriber.onNext("d");
Thread.sleep(1000);
subscriber.onNext("e");
Thread.sleep(1000);
subscriber.onNext("f");
Thread.sleep(1000);
subscriber.onNext("g");
Thread.sleep(1000);
subscriber.onNext("h");
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
});
observable
.delay(2, TimeUnit.SECONDS)
.subscribe(new Action1<String>() {
@Override
public void call(String string) {
System.out.println(string);
}
});
}
}
没有.delay(2, TimeUnit.SECONDS)
我有输出:a
b
C
d
电子
F
G
H
但是 .delay(2, TimeUnit.SECONDS)
输出缺少 "g" 和 "h":
一种
b
C
d
电子
f
怎么可能?文档说 delay 只是发出由源 Observable 发出的项目,按指定的 delay
及时向前移动您正在使用的 delay
重载计划在不同的线程上工作,并导致隐式竞争 condition.All 时间运算符(例如 delay
、buffer
、和 window
) 需要使用调度程序来安排稍后的效果,如果您没有意识到并小心使用它们,这可能会导致意外的竞争条件。在这种情况下,延迟运算符将下游工作安排在单独的线程池上。这是您测试中的执行顺序(在主线程上)。
- 您的 Observable 已订阅并在
onNext("a")
之前等待 1000 毫秒
- 接下来是延迟接收。这会在 2 秒后安排下游 onNext。
- 控制流 returns 立即到等待 1000 毫秒的可观察对象。
- 可观察到
onNext("b")
延迟。 Delay 将 "b" 的 onNext 安排在 2 秒后。 - .....(重复)
- 当您的 observable 调用
onNext("h")
时,它会安排工作,然后立即 returns 订阅并终止您的测试(导致安排的工作消失)。
为了让它异步执行,您可以在 trampoline 调度程序实现上安排延迟。
.delay(2, TimeUnit.SECONDS, Schedulers.trampoline())