使用已处置的观察者不会重新订阅源
Using disposed observer does not re-subscribe to the source
我正在尝试通过仅创建 DisposableSingleObserver/DisposableObserver
的单个实例并将它们通过subscribeWith()
流上的方法如下所示:
public class SomeClass {
private DisposableSingleObserver<Object> observer;
public SomeClass() {
observer = new DisposableSingleObserver<Object>() {
@Override
public void onSuccess(Object object) {
...
}
@Override
public void onError(Throwable throwable) {
...
}
};
}
public void doSomeStuff() {
singleStream.subscribeOn(...)
.observeOn(...)
.subscribeWith(observer);
}
}
当我尝试使用消息多次订阅单个观察者实例时,上面的代码导致 ProtocolViolationException
:
io.reactivex.exceptions.ProtocolViolationException: It is not allowed to subscribe with a(n) com.package.name.SomeClass multiple times. Please create a fresh instance of com.package.name.SomeClass and subscribe that to the target source instead.
所以我修改了代码如下:
public class SomeClass {
...
public void doSomeStuff() {
if (observer != null) {
observer.dispose();
}
singleStream.subscribeOn(...)
.observerOn(...)
.subscribeWith(observer);
}
}
当我执行上面的代码时, ProtocolViolationException
不再抛出,我能够成功地从单个流中获取事件。但是当我尝试调用 doSomeStuff()
方法几次时,它在第一次调用时成功完成,但在第二次调用时没有发出任何事件。我能够确认订阅通过 doOnSubscribe()
发生了两次,但是单流的代码发射事件从未在第二次调用时执行。所以我有三个问题:
- 为什么
ProtocolViolationException
首先被抛出?
- 为什么我能够在第一次订阅时获得事件,但在第二次订阅时却无法获得事件,即使我们在这两种情况下都订阅了源?
- 如何通过仅使用单个实例来重用观察者?
1) 您不能重复使用 DisposableSingleObserver
及其表兄弟,因为它们是有状态的并且只能使用一次。这是由于 Single
规定的协议:只有一个 onSubscribe
后跟最多一个 onSuccess
或 onError
。第二个订阅违反了这个协议。
2) 处置 DisposableSingleObserver
将其置于处置状态,任何后续订阅尝试都将被视为立即处置。
3) 你不应该。为什么要订阅多次,为什么不能每次都使用新的 DisposableSingleObserver
。您应该重新考虑您的数据流。
我正在尝试通过仅创建 DisposableSingleObserver/DisposableObserver
的单个实例并将它们通过subscribeWith()
流上的方法如下所示:
public class SomeClass {
private DisposableSingleObserver<Object> observer;
public SomeClass() {
observer = new DisposableSingleObserver<Object>() {
@Override
public void onSuccess(Object object) {
...
}
@Override
public void onError(Throwable throwable) {
...
}
};
}
public void doSomeStuff() {
singleStream.subscribeOn(...)
.observeOn(...)
.subscribeWith(observer);
}
}
当我尝试使用消息多次订阅单个观察者实例时,上面的代码导致 ProtocolViolationException
:
io.reactivex.exceptions.ProtocolViolationException: It is not allowed to subscribe with a(n) com.package.name.SomeClass multiple times. Please create a fresh instance of com.package.name.SomeClass and subscribe that to the target source instead.
所以我修改了代码如下:
public class SomeClass {
...
public void doSomeStuff() {
if (observer != null) {
observer.dispose();
}
singleStream.subscribeOn(...)
.observerOn(...)
.subscribeWith(observer);
}
}
当我执行上面的代码时, ProtocolViolationException
不再抛出,我能够成功地从单个流中获取事件。但是当我尝试调用 doSomeStuff()
方法几次时,它在第一次调用时成功完成,但在第二次调用时没有发出任何事件。我能够确认订阅通过 doOnSubscribe()
发生了两次,但是单流的代码发射事件从未在第二次调用时执行。所以我有三个问题:
- 为什么
ProtocolViolationException
首先被抛出? - 为什么我能够在第一次订阅时获得事件,但在第二次订阅时却无法获得事件,即使我们在这两种情况下都订阅了源?
- 如何通过仅使用单个实例来重用观察者?
1) 您不能重复使用 DisposableSingleObserver
及其表兄弟,因为它们是有状态的并且只能使用一次。这是由于 Single
规定的协议:只有一个 onSubscribe
后跟最多一个 onSuccess
或 onError
。第二个订阅违反了这个协议。
2) 处置 DisposableSingleObserver
将其置于处置状态,任何后续订阅尝试都将被视为立即处置。
3) 你不应该。为什么要订阅多次,为什么不能每次都使用新的 DisposableSingleObserver
。您应该重新考虑您的数据流。