RxJava2.1.0:在不同线程上订阅时不调用 PublishSubject onNext
RxJava2.1.0 : PublishSubject onNext not invoked when subscribed on different threads
发现当订阅一个序列化的PublishSubject时,会在10-20毫秒内调用Subjects的onNext事件;新订阅者的 onNext 未被调用。
在下面的代码片段中; observe[1] 的值为“2000”,subscribeToSubject() 在对值为 1998 的 Subject 调用 onNext() 之后调用 subscribeToSubject() [2];我们看到,如果间隔为 10 毫秒,新订阅者将错过 Subject 触发的值 2000;然而,如果间隔为 50 毫秒或更长,则新订阅者似乎会收到预期值;这是预期的行为吗?
这种行为出现在 RxJava 2.1.0 上;似乎是某种竞争条件
public class PublishSubjectTest {
private final Subject<String> singlePropertyUpdateSubject =
PublishSubject.<String>create().toSerialized();
public static void main(String[] args) {
PublishSubjectTest obj= new PublishSubjectTest();
obj.sendEvents();
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//[1]
private final String valueToObserve = "2000";
private void subscribeToSubject() {
System.out.println("Subscribing .....");
io.reactivex.Observable.range(1,10).subscribeOn(Schedulers.newThread()).subscribe(
value -> getAndObserve(valueToObserve).subscribe(observedValue -> System.out.println(" Value Received "+observedValue +" By "+Thread.currentThread() ))
);
}
private io.reactivex.Observable<String> getAndObserve(String value) {
final io.reactivex.Observable<String> observable = singlePropertyUpdateSubject
//.doOnNext(v-> System.out.println("Received value "+v))
.filter(v -> v.equals(value))
.doOnSubscribe(c-> System.out.println("Consumer subscribed "+c));
return observable;
}
// 50ms >= expected result ; Anything less than 10ms will fail.
private void sendEvents() {
io.reactivex.Observable.interval(10, TimeUnit.MILLISECONDS).subscribe(value -> {
String key = value.toString();
//System.out.println("Adding key "+key);
singlePropertyUpdateSubject.onNext(key);
//[2]
if (value == 1998){
subscribeToSubject();;
}
if (value%100==0) {
System.out.println(value);
}
});
}
好的,问题出在 PublishSubject 上; ReplaySubject 的使用似乎解决了并发订阅的问题;使用 ReplaySubject 时调用 system.exit(1) 的测试代码也存在问题;提供详细讨论 github.com/ReactiveX/RxJava/issues/6414 –
请将此问题视为已关闭。
发现当订阅一个序列化的PublishSubject时,会在10-20毫秒内调用Subjects的onNext事件;新订阅者的 onNext 未被调用。
在下面的代码片段中; observe[1] 的值为“2000”,subscribeToSubject() 在对值为 1998 的 Subject 调用 onNext() 之后调用 subscribeToSubject() [2];我们看到,如果间隔为 10 毫秒,新订阅者将错过 Subject 触发的值 2000;然而,如果间隔为 50 毫秒或更长,则新订阅者似乎会收到预期值;这是预期的行为吗?
这种行为出现在 RxJava 2.1.0 上;似乎是某种竞争条件
public class PublishSubjectTest {
private final Subject<String> singlePropertyUpdateSubject =
PublishSubject.<String>create().toSerialized();
public static void main(String[] args) {
PublishSubjectTest obj= new PublishSubjectTest();
obj.sendEvents();
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//[1]
private final String valueToObserve = "2000";
private void subscribeToSubject() {
System.out.println("Subscribing .....");
io.reactivex.Observable.range(1,10).subscribeOn(Schedulers.newThread()).subscribe(
value -> getAndObserve(valueToObserve).subscribe(observedValue -> System.out.println(" Value Received "+observedValue +" By "+Thread.currentThread() ))
);
}
private io.reactivex.Observable<String> getAndObserve(String value) {
final io.reactivex.Observable<String> observable = singlePropertyUpdateSubject
//.doOnNext(v-> System.out.println("Received value "+v))
.filter(v -> v.equals(value))
.doOnSubscribe(c-> System.out.println("Consumer subscribed "+c));
return observable;
}
// 50ms >= expected result ; Anything less than 10ms will fail.
private void sendEvents() {
io.reactivex.Observable.interval(10, TimeUnit.MILLISECONDS).subscribe(value -> {
String key = value.toString();
//System.out.println("Adding key "+key);
singlePropertyUpdateSubject.onNext(key);
//[2]
if (value == 1998){
subscribeToSubject();;
}
if (value%100==0) {
System.out.println(value);
}
});
}
好的,问题出在 PublishSubject 上; ReplaySubject 的使用似乎解决了并发订阅的问题;使用 ReplaySubject 时调用 system.exit(1) 的测试代码也存在问题;提供详细讨论 github.com/ReactiveX/RxJava/issues/6414 –
请将此问题视为已关闭。