Rxjava - 在订阅之前在 PublishSubject 上发出

Rxjava - emit on PublishSubject before subscribe

考虑一个场景,我们有一个流发出字符串,我们想将字符串保存在文件中。

我正在使用 PublishSubject,这很好用:

Subject<String> stream = PublishSubject.create();
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string1")
mReverseGeocoderStream.onNext("some-string2")

然而,这不起作用(只有 some-string2 被传送)

Subject<String> stream = PublishSubject.create();
mReverseGeocoderStream.onNext("some-string1")
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string2")

有没有办法让第二种情况也起作用?

即,我们可以更改 PublishSubject 以确保它缓冲事件直到订阅者使用它们吗?

请注意,BehaviorSubject 不是一个选项,因为重新订阅会导致另一个文件保存。它没有 "consuming events".

的概念

我发现 UnicastSubject 几乎是我想要的,除了当我取消订阅并稍后重新订阅不同的订阅者时它失败并出现 IllegalStateException。


用例:

假设我们有一个 android 应用程序。它发出网络请求,在网络请求的后面,它需要显示一个对话框。在发出请求时,用户将应用置于后台。此时,我们取消订阅正在监听信号以显示对话的观察者。

网络请求返回并发出信号以显示对话已被触发到流中。此时没有人在听。用户将应用置于前台。新订阅者附加到网络请求管理器 (ViewModel)。此时,我希望将 "unconsumed" 信号传递给订阅者。

注意:我不能使用行为主题。如果我这样做,每次用户背景和前景应用程序都会显示对话框。我希望在显示对话框后消耗并完成事件。

做了一些更多的研究并发现了这个:

https://gist.github.com/xsveda/8c556516079fde97d04b4b7e14a18463

来自 :

请注意,它使用了 Relay,但如果您不想引入其他依赖项,可以轻松地将 Relay 替换为 Subject。

我对其他解决方案持开放态度,也许会批评为什么这个解决方案不是一个好的解决方案。

/**
 * Relay that buffers values when no Observer subscribed and replays them to Observer as requested. Such values are not replayed
 * to any other Observer.
 * <p>
 * This relay holds an unbounded internal buffer.
 * <p>
 * This relay allows only a single Observer at a time to be subscribed to it.
 * <p>
 * If more than one Observer attempts to subscribe to this Relay at the same time, they
 * will receive an IllegalStateException.
 *
 * @param <T> the value type received and emitted by this Relay subclass
 */
public final class CacheRelay<T> extends Relay<T> {

    private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
    private final PublishRelay<T> relay = PublishRelay.create();

    private CacheRelay() {
    }

    public static <T> CacheRelay<T> create() {
        return new CacheRelay<>();
    }

    @Override
    public void accept(T value) {
        if (relay.hasObservers()) {
            relay.accept(value);
        } else {
            queue.add(value);
        }
    }

    @Override
    public boolean hasObservers() {
        return relay.hasObservers();
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (hasObservers()) {
            EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer);
        } else {
            for (T element; (element = queue.poll()) != null; ) {
                observer.onNext(element);
            }
            relay.subscribeActual(observer);
        }
    }
}