RxJava 异步订阅会丢失消息
RxJava async subscribe will lost message
我正在学习 RxJava 异步订阅的工作原理。但是有些问题让我很困惑。
@Test public void testCreateAsync() throws InterruptedException {
Observable<String> observable = Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
if (!emitter.isDisposed()) {
int finalI = i;
new Thread(() -> emitter.onNext("value_" + finalI)).start();
}
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
});
observable.subscribe(System.out::println);
}
上面的代码工作正常,打印 value_1
到 value_9
。但是当我在订阅前添加一个睡眠时,最后一条消息value_9
不会被打印出来,像这样:
@Test public void testCreateAsync() throws InterruptedException {
...
Thread.sleep(3000);
observable.subscribe(System.out::println);
}
感谢任何关于此问题的讨论。
ps: java版本为1.8
,RxJava版本为2.1.1
.
有两个问题:
- 同时从不同线程调用
onNext
onComplete
可能会在之前的线程能够发出它们的值之前执行,这会阻止任何进一步的 onNext
排放。
睡眠没有特别的影响,重复执行上面的代码有时会产生这种行为。
(我不清楚你想通过此设置实现什么)。
我正在学习 RxJava 异步订阅的工作原理。但是有些问题让我很困惑。
@Test public void testCreateAsync() throws InterruptedException {
Observable<String> observable = Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
if (!emitter.isDisposed()) {
int finalI = i;
new Thread(() -> emitter.onNext("value_" + finalI)).start();
}
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
});
observable.subscribe(System.out::println);
}
上面的代码工作正常,打印 value_1
到 value_9
。但是当我在订阅前添加一个睡眠时,最后一条消息value_9
不会被打印出来,像这样:
@Test public void testCreateAsync() throws InterruptedException {
...
Thread.sleep(3000);
observable.subscribe(System.out::println);
}
感谢任何关于此问题的讨论。
ps: java版本为1.8
,RxJava版本为2.1.1
.
有两个问题:
- 同时从不同线程调用
onNext
onComplete
可能会在之前的线程能够发出它们的值之前执行,这会阻止任何进一步的onNext
排放。
睡眠没有特别的影响,重复执行上面的代码有时会产生这种行为。
(我不清楚你想通过此设置实现什么)。