RxJava 2.x: serialize() 不起作用

RxJava 2.x: serialize() doesn't work

我在下面尝试测试 sereialize()

我从 2 个不同的线程中调用了 onNext 1,000,000 次。 然后,我期望在 onComplete 获得 2,000,000。

但是,我无法得到预期的值。

private static int count = 0;

private static void setCount(int value) {
  count = value;
}

private static final int TEST_LOOP = 10;

private static final int NEXT_LOOP = 1_000_000;

@Test
public void test() throws Exception {

  for (int test = 0; test < TEST_LOOP; test++) {
    Flowable.create(emitter -> {
      ExecutorService service = Executors.newCachedThreadPool();
      emitter.setCancellable(() -> service.shutdown());

      Future<Boolean> future1 = service.submit(() -> {
        for (int i = 0; i < NEXT_LOOP; i++) {
          emitter.onNext(i);
        }
        return true;
      });

      Future<Boolean> future2 = service.submit(() -> {
        for (int i = 0; i < NEXT_LOOP; i++) {
          emitter.onNext(i);
        }
        return true;
      });

      if (future1.get(1, TimeUnit.SECONDS)
          && future2.get(1, TimeUnit.SECONDS)) {
        emitter.onComplete();
      }
    }, BackpressureStrategy.BUFFER)
        .serialize()
        .cast(Integer.class)
        .subscribe(new Subscriber<Integer>() {

          private int count = 0;

          @Override
          public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE);
          }

          @Override
          public void onNext(Integer t) {
            count++;
          }

          @Override
          public void onError(Throwable t) {
            fail(t.getMessage());
          }

          @Override
          public void onComplete() {
            setCount(count);
          }
        });

    assertThat(count, is(NEXT_LOOP * 2));
  }
}

我想知道是不是serialize()不起作用或者我误解了serialize()

的用法

我检查了 SerializedSubscriber 的来源。

@Override
public void onNext(T t) {
  ...
  synchronized(this){
    ...
  }
  actual.onNext(t);
  emitLoop();
}

由于 actual.onNext(t); 是在同步块之外调用的,我猜 actual.onNext(t); 可以同时从不同的线程调用。另外,我想可能会在 onNext 完成之前调用 onComplete

我使用的是 RxJava 2.0.4。

这不是错误,而是 FlowableEmittermisuse:

The onNext, onError and onComplete methods should be called in a sequential manner, just like the Subscriber's methods. Use serialize() if you want to ensure this. The other methods are thread-safe.

FlowableEmitter.serialize()

应用 Flowable.serialize() 对于 create 操作员来说太迟了。