使用已处置的观察者不会重新订阅源

Using disposed observer does not re-subscribe to the source

我正在尝试通过仅创建 DisposableSingleObserver/DisposableObserver 的单个实例并将它们通过subscribeWith() 流上的方法如下所示:

public class SomeClass {
    private DisposableSingleObserver<Object> observer;

    public SomeClass() {
        observer = new DisposableSingleObserver<Object>() {
            @Override
            public void onSuccess(Object object) {
                ...
            }

            @Override
            public void onError(Throwable throwable) {
                ...
            }
        };
    }

    public void doSomeStuff() {
        singleStream.subscribeOn(...)
            .observeOn(...)
            .subscribeWith(observer);
    }
}

当我尝试使用消息多次订阅单个观察者实例时,上面的代码导致 ProtocolViolationException

io.reactivex.exceptions.ProtocolViolationException: It is not allowed to subscribe with a(n) com.package.name.SomeClass multiple times. Please create a fresh instance of com.package.name.SomeClass and subscribe that to the target source instead.

所以我修改了代码如下:

public class SomeClass {
    ...

    public void doSomeStuff() {
        if (observer != null) {
            observer.dispose();
        }

        singleStream.subscribeOn(...)
            .observerOn(...)
            .subscribeWith(observer);
    }
}

当我执行上面的代码时, ProtocolViolationException 不再抛出,我能够成功地从单个流中获取事件。但是当我尝试调用 doSomeStuff() 方法几次时,它在第一次调用时成功完成,但在第二次调用时没有发出任何事件。我能够确认订阅通过 doOnSubscribe() 发生了两次,但是单流的代码发射事件从未在第二次调用时执行。所以我有三个问题:

  1. 为什么 ProtocolViolationException 首先被抛出?
  2. 为什么我能够在第一次订阅时获得事件,但在第二次订阅时却无法获得事件,即使我们在这两种情况下都订阅了源?
  3. 如何通过仅使用单个实例来重用观察者?

1) 您不能重复使用 DisposableSingleObserver 及其表兄弟,因为它们是有状态的并且只能使用一次。这是由于 Single 规定的协议:只有一个 onSubscribe 后跟最多一个 onSuccessonError。第二个订阅违反了这个协议。

2) 处置 DisposableSingleObserver 将其置于处置状态,任何后续订阅尝试都将被视为立即处置。

3) 你不应该。为什么要订阅多次,为什么不能每次都使用新的 DisposableSingleObserver。您应该重新考虑您的数据流。