RxJava 2.x: serialize() 不起作用
RxJava 2.x: serialize() doesn't work
我在下面尝试测试 sereialize()
。
我从 2 个不同的线程中调用了 onNext
1,000,000 次。
然后,我期望在 onComplete
获得 2,000,000。
但是,我无法得到预期的值。
private static int count = 0;
private static void setCount(int value) {
count = value;
}
private static final int TEST_LOOP = 10;
private static final int NEXT_LOOP = 1_000_000;
@Test
public void test() throws Exception {
for (int test = 0; test < TEST_LOOP; test++) {
Flowable.create(emitter -> {
ExecutorService service = Executors.newCachedThreadPool();
emitter.setCancellable(() -> service.shutdown());
Future<Boolean> future1 = service.submit(() -> {
for (int i = 0; i < NEXT_LOOP; i++) {
emitter.onNext(i);
}
return true;
});
Future<Boolean> future2 = service.submit(() -> {
for (int i = 0; i < NEXT_LOOP; i++) {
emitter.onNext(i);
}
return true;
});
if (future1.get(1, TimeUnit.SECONDS)
&& future2.get(1, TimeUnit.SECONDS)) {
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.serialize()
.cast(Integer.class)
.subscribe(new Subscriber<Integer>() {
private int count = 0;
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer t) {
count++;
}
@Override
public void onError(Throwable t) {
fail(t.getMessage());
}
@Override
public void onComplete() {
setCount(count);
}
});
assertThat(count, is(NEXT_LOOP * 2));
}
}
我想知道是不是serialize()
不起作用或者我误解了serialize()
的用法
我检查了 SerializedSubscriber 的来源。
@Override
public void onNext(T t) {
...
synchronized(this){
...
}
actual.onNext(t);
emitLoop();
}
由于 actual.onNext(t);
是在同步块之外调用的,我猜 actual.onNext(t);
可以同时从不同的线程调用。另外,我想可能会在 onNext
完成之前调用 onComplete
。
我使用的是 RxJava 2.0.4。
这不是错误,而是 FlowableEmitter
的 misuse:
The onNext, onError and onComplete methods should be called in a sequential manner, just like the Subscriber's methods. Use serialize() if you want to ensure this. The other methods are thread-safe.
FlowableEmitter.serialize()
应用 Flowable.serialize()
对于 create
操作员来说太迟了。
我在下面尝试测试 sereialize()
。
我从 2 个不同的线程中调用了 onNext
1,000,000 次。
然后,我期望在 onComplete
获得 2,000,000。
但是,我无法得到预期的值。
private static int count = 0;
private static void setCount(int value) {
count = value;
}
private static final int TEST_LOOP = 10;
private static final int NEXT_LOOP = 1_000_000;
@Test
public void test() throws Exception {
for (int test = 0; test < TEST_LOOP; test++) {
Flowable.create(emitter -> {
ExecutorService service = Executors.newCachedThreadPool();
emitter.setCancellable(() -> service.shutdown());
Future<Boolean> future1 = service.submit(() -> {
for (int i = 0; i < NEXT_LOOP; i++) {
emitter.onNext(i);
}
return true;
});
Future<Boolean> future2 = service.submit(() -> {
for (int i = 0; i < NEXT_LOOP; i++) {
emitter.onNext(i);
}
return true;
});
if (future1.get(1, TimeUnit.SECONDS)
&& future2.get(1, TimeUnit.SECONDS)) {
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.serialize()
.cast(Integer.class)
.subscribe(new Subscriber<Integer>() {
private int count = 0;
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer t) {
count++;
}
@Override
public void onError(Throwable t) {
fail(t.getMessage());
}
@Override
public void onComplete() {
setCount(count);
}
});
assertThat(count, is(NEXT_LOOP * 2));
}
}
我想知道是不是serialize()
不起作用或者我误解了serialize()
我检查了 SerializedSubscriber 的来源。
@Override
public void onNext(T t) {
...
synchronized(this){
...
}
actual.onNext(t);
emitLoop();
}
由于 actual.onNext(t);
是在同步块之外调用的,我猜 actual.onNext(t);
可以同时从不同的线程调用。另外,我想可能会在 onNext
完成之前调用 onComplete
。
我使用的是 RxJava 2.0.4。
这不是错误,而是 FlowableEmitter
的 misuse:
The onNext, onError and onComplete methods should be called in a sequential manner, just like the Subscriber's methods. Use serialize() if you want to ensure this. The other methods are thread-safe.
FlowableEmitter.serialize()
应用 Flowable.serialize()
对于 create
操作员来说太迟了。