Rxjava - 在订阅之前在 PublishSubject 上发出
Rxjava - emit on PublishSubject before subscribe
考虑一个场景,我们有一个流发出字符串,我们想将字符串保存在文件中。
我正在使用 PublishSubject,这很好用:
Subject<String> stream = PublishSubject.create();
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string1")
mReverseGeocoderStream.onNext("some-string2")
然而,这不起作用(只有 some-string2
被传送)
Subject<String> stream = PublishSubject.create();
mReverseGeocoderStream.onNext("some-string1")
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string2")
有没有办法让第二种情况也起作用?
即,我们可以更改 PublishSubject
以确保它缓冲事件直到订阅者使用它们吗?
请注意,BehaviorSubject
不是一个选项,因为重新订阅会导致另一个文件保存。它没有 "consuming events".
的概念
我发现 UnicastSubject 几乎是我想要的,除了当我取消订阅并稍后重新订阅不同的订阅者时它失败并出现 IllegalStateException。
用例:
假设我们有一个 android 应用程序。它发出网络请求,在网络请求的后面,它需要显示一个对话框。在发出请求时,用户将应用置于后台。此时,我们取消订阅正在监听信号以显示对话的观察者。
网络请求返回并发出信号以显示对话已被触发到流中。此时没有人在听。用户将应用置于前台。新订阅者附加到网络请求管理器 (ViewModel)。此时,我希望将 "unconsumed" 信号传递给订阅者。
注意:我不能使用行为主题。如果我这样做,每次用户背景和前景应用程序都会显示对话框。我希望在显示对话框后消耗并完成事件。
做了一些更多的研究并发现了这个:
https://gist.github.com/xsveda/8c556516079fde97d04b4b7e14a18463
来自 :
请注意,它使用了 Relay,但如果您不想引入其他依赖项,可以轻松地将 Relay 替换为 Subject。
我对其他解决方案持开放态度,也许会批评为什么这个解决方案不是一个好的解决方案。
/**
* Relay that buffers values when no Observer subscribed and replays them to Observer as requested. Such values are not replayed
* to any other Observer.
* <p>
* This relay holds an unbounded internal buffer.
* <p>
* This relay allows only a single Observer at a time to be subscribed to it.
* <p>
* If more than one Observer attempts to subscribe to this Relay at the same time, they
* will receive an IllegalStateException.
*
* @param <T> the value type received and emitted by this Relay subclass
*/
public final class CacheRelay<T> extends Relay<T> {
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private final PublishRelay<T> relay = PublishRelay.create();
private CacheRelay() {
}
public static <T> CacheRelay<T> create() {
return new CacheRelay<>();
}
@Override
public void accept(T value) {
if (relay.hasObservers()) {
relay.accept(value);
} else {
queue.add(value);
}
}
@Override
public boolean hasObservers() {
return relay.hasObservers();
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (hasObservers()) {
EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer);
} else {
for (T element; (element = queue.poll()) != null; ) {
observer.onNext(element);
}
relay.subscribeActual(observer);
}
}
}
考虑一个场景,我们有一个流发出字符串,我们想将字符串保存在文件中。
我正在使用 PublishSubject,这很好用:
Subject<String> stream = PublishSubject.create();
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string1")
mReverseGeocoderStream.onNext("some-string2")
然而,这不起作用(只有 some-string2
被传送)
Subject<String> stream = PublishSubject.create();
mReverseGeocoderStream.onNext("some-string1")
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string2")
有没有办法让第二种情况也起作用?
即,我们可以更改 PublishSubject
以确保它缓冲事件直到订阅者使用它们吗?
请注意,BehaviorSubject
不是一个选项,因为重新订阅会导致另一个文件保存。它没有 "consuming events".
我发现 UnicastSubject 几乎是我想要的,除了当我取消订阅并稍后重新订阅不同的订阅者时它失败并出现 IllegalStateException。
用例:
假设我们有一个 android 应用程序。它发出网络请求,在网络请求的后面,它需要显示一个对话框。在发出请求时,用户将应用置于后台。此时,我们取消订阅正在监听信号以显示对话的观察者。
网络请求返回并发出信号以显示对话已被触发到流中。此时没有人在听。用户将应用置于前台。新订阅者附加到网络请求管理器 (ViewModel)。此时,我希望将 "unconsumed" 信号传递给订阅者。
注意:我不能使用行为主题。如果我这样做,每次用户背景和前景应用程序都会显示对话框。我希望在显示对话框后消耗并完成事件。
做了一些更多的研究并发现了这个:
https://gist.github.com/xsveda/8c556516079fde97d04b4b7e14a18463
来自 :
请注意,它使用了 Relay,但如果您不想引入其他依赖项,可以轻松地将 Relay 替换为 Subject。
我对其他解决方案持开放态度,也许会批评为什么这个解决方案不是一个好的解决方案。
/**
* Relay that buffers values when no Observer subscribed and replays them to Observer as requested. Such values are not replayed
* to any other Observer.
* <p>
* This relay holds an unbounded internal buffer.
* <p>
* This relay allows only a single Observer at a time to be subscribed to it.
* <p>
* If more than one Observer attempts to subscribe to this Relay at the same time, they
* will receive an IllegalStateException.
*
* @param <T> the value type received and emitted by this Relay subclass
*/
public final class CacheRelay<T> extends Relay<T> {
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private final PublishRelay<T> relay = PublishRelay.create();
private CacheRelay() {
}
public static <T> CacheRelay<T> create() {
return new CacheRelay<>();
}
@Override
public void accept(T value) {
if (relay.hasObservers()) {
relay.accept(value);
} else {
queue.add(value);
}
}
@Override
public boolean hasObservers() {
return relay.hasObservers();
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (hasObservers()) {
EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer);
} else {
for (T element; (element = queue.poll()) != null; ) {
observer.onNext(element);
}
relay.subscribeActual(observer);
}
}
}