RxJava 异步订阅会丢失消息

RxJava async subscribe will lost message

我正在学习 RxJava 异步订阅的工作原理。但是有些问题让我很困惑。

@Test public void testCreateAsync() throws InterruptedException {
  Observable<String> observable = Observable.create(emitter -> {
    for (int i = 0; i < 10; i++) {
      if (!emitter.isDisposed()) {
        int finalI = i;
        new Thread(() -> emitter.onNext("value_" + finalI)).start();
      }
    }
    if (!emitter.isDisposed()) {
      emitter.onComplete();
    }
  });

  observable.subscribe(System.out::println);
}

上面的代码工作正常,打印 value_1value_9。但是当我在订阅前添加一个睡眠时,最后一条消息value_9不会被打印出来,像这样:

@Test public void testCreateAsync() throws InterruptedException {
  ...
  Thread.sleep(3000);
  observable.subscribe(System.out::println);
}

感谢任何关于此问题的讨论。

ps: java版本为1.8,RxJava版本为2.1.1.

有两个问题:

  1. 同时从不同线程调用 onNext
  2. onComplete 可能会在之前的线程能够发出它们的值之前执行,这会阻止任何进一步的 onNext 排放。

睡眠没有特别的影响,重复执行上面的代码有时会产生这种行为。

(我不清楚你想通过此设置实现什么)。