单独重放合并的流
Replaying merged streams individually
我有一个 ObservableTransformer,由几个合并在一起的子流组成,如下例所示。
ObservableTransformer<Action, Result> actionsToResults =
actions -> actions.publish(shared ->
Observable.merge(
Observable.merge(
shared.ofType(SendMessageAction.class).compose(sendMessageTransformer),
shared.ofType(CheckMessageInputAction.class).compose(checkIfEmptyTransformer)),
model.messagesObservable
.observeOn(AndroidSchedulers.mainThread())
)).replay(1).autoConnect();
// ╔════╗
// ╔══╣ ╠══╗
// ══╣ ╚════╝ ╠══(replay)>
// ╚══════════╝
我正在重播最后发出的元素,但我想要的是单独重播流。像这样:
// ╔════╗
// ╔══╣ ╠═(replay)═╗
// ══╣ ╚════╝ ╠══>
// ╚═════════(replay)═╝
因此,当我订阅整个流时,它会发出这两个内部流的最后一个元素。这能做到吗?
编辑:为了让我的问题更清楚,我将扩展我认为问题所在的位置。
我希望整个流成为可连接的可观察对象。这很重要,因为它位于无头片段中,并且不应该重新触发其所有轮换调用。同时我想单独重播这些流。在外部合并中移动 .replay(1).autoConnect()
是行不通的。整个流将在每次新订阅时重新创建。在外部合并中添加自定义运算符,例如:
private ObservableTransformer<Action, Result> actionsToResults =
actions -> actions.publish(shared -> Observable.merge(Observable.merge(
shared.ofType(SendMessageAction.class).compose(sendMessageTransformer),
shared.ofType(CheckMessageInputAction.class).compose(checkIfEmptyTransformer)).lift(new ReplayLastOperator<Result>())
model.messagesObservable.lift(new ReplayLastOperator<>()).observeOn(AndroidSchedulers.mainThread())
));
其中 ReplayLastOperator<Result>()
重播每个新订阅的最后一个元素等同于移动 replay(1).autoConnect()
,整个流在每个新订阅时重新创建。所以没有什么可以重播的。上面带有 .publish().autoConnect()
的解决方案,只会订阅一次内部流,所以我失去了检测上游新订阅的能力,因此重播也不起作用。
我 运行 没主意了。
是的。
您可以创建一个自定义运算符来缓存它看到的最后一个项目。
然后当从上游接收到 onComplete 时,它会重新发出它在发出 onComplete 本身之前看到的最后一个项目。该运算符的通用形式如下所示:
public class ReplayLast <T> implements ObservableOperator<T, T> {
@Override
public Observer<? super T> apply(@NonNull final Observer<? super T> observer) throws Exception {
return new Observer<T>(){
Disposable disposable;
T lastItem; // cache for last item
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
observer.onSubscribe(d);
}
@Override
public void onNext(@NonNull T t) {
lastItem = t; // cache each item from up stream
observer.onNext(t); // emit as usual
}
@Override
public void onError(@NonNull Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
if(lastItem != null){
// use the cache to emit the item again before signaling the complete downstream
observer.onNext(lastItem);
}
observer.onComplete();
}
};
}
那就只需要用在合适的地方就可以了。应该类似于以下内容:
ObservableTransformer<Action, Result> actionsToResults =
actions -> actions.publish(shared ->
Observable.merge(
Observable.merge(
shared.ofType(SendMessageAction.class).compose(sendMessageTransformer),
shared.ofType(CheckMessageInputAction.class).compose(checkIfEmptyTransformer)).lift(new ReplayLast()),
model.messagesObservable.lift(new ReplayLast())
.observeOn(AndroidSchedulers.mainThread())
)).autoConnect();
我针对我的特殊情况解决了这个问题。我将添加一些额外的信息,供您查看它是否适用于您的情况。
在我的例子中,我想要重放 return 不同类型的所有流,由抽象 class 密封。所以我对这个问题的解决方案是实现我自己的 ObservableTransoformer
,命名为 ReplayLastByClassType
,它兑现映射中按类型接收的最后一个元素。您只需调用 .compose(ReplayLastByClassType.<SealingClass>instance()
或 ReplayLastByClassType.instance()
.
即可应用它
转换器在订阅时重新发送兑现值。之后它继续在连续的 onNext
调用中传递元素。
- 警告 1. 在 onSubscribe 中编辑的元素顺序 return 不一定保留。(在我的情况下这不是必需的)。
- 警告 2. 即使转换后的可观察对象的所有订阅者取消订阅,上游可观察对象也不会被取消订阅!!!这是设计使然。
这种方法也适用于您,即使您要重播 return 个相同类型的对象也是如此。您必须将结果包装到单独的助手 classes 中以强制分隔 class.
终于定制了 ObservableTransformer
,复制粘贴快乐 :)
public final class ReplayLastByClassType<T> implements ObservableTransformer<T, T> {
private static final ReplayLastByClassType<Object> INSTANCE = new ReplayLastByClassType<>();
/** The singleton instance of this transformer **/
@SuppressWarnings("unchecked")
public static <T> ReplayLastByClassType<T> instance() {
return (ReplayLastByClassType<T>) INSTANCE;
}
private ReplayLastByClassType() {
}
@Override
public Observable<T> apply(Observable<T> upstream) {
LastSeen<T> lastSeen = new LastSeen<>();
return new LastSeenObservable<>(upstream.doOnNext(lastSeen).publish().autoConnect(), lastSeen);
}
static final class LastSeen<T> implements Consumer<T> {
ConcurrentHashMap<Class, T> values = new ConcurrentHashMap<>();
@Override public void accept(T latest) {
values.put(latest.getClass(), latest);
}
}
static final class LastSeenObservable<T> extends Observable<T> {
private final Observable<T> upstream;
private final LastSeen<T> lastSeen;
LastSeenObservable(Observable<T> upstream, LastSeen<T> lastSeen) {
this.upstream = upstream;
this.lastSeen = lastSeen;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
upstream.subscribe(new LastSeenObserver<T>(observer, lastSeen));
}
}
static final class LastSeenObserver<T> implements Observer<T> {
private final Observer<? super T> downstream;
private final LastSeen<T> lastSeen;
LastSeenObserver(Observer<? super T> downstream, LastSeen<T> lastSeen) {
this.downstream = downstream;
this.lastSeen = lastSeen;
}
@Override public void onSubscribe(Disposable d) {
downstream.onSubscribe(d);
Map<Class, T> values = lastSeen.values;
for (T t: values.values()) {
downstream.onNext(t);
}
}
@Override public void onNext(T value) {
downstream.onNext(value);
}
@Override public void onComplete() {
downstream.onComplete();
}
@Override public void onError(Throwable e) {
downstream.onError(e);
}
}
}
PS:如果您发现代码中有任何错误,请在评论中纠正我。
我有一个 ObservableTransformer,由几个合并在一起的子流组成,如下例所示。
ObservableTransformer<Action, Result> actionsToResults =
actions -> actions.publish(shared ->
Observable.merge(
Observable.merge(
shared.ofType(SendMessageAction.class).compose(sendMessageTransformer),
shared.ofType(CheckMessageInputAction.class).compose(checkIfEmptyTransformer)),
model.messagesObservable
.observeOn(AndroidSchedulers.mainThread())
)).replay(1).autoConnect();
// ╔════╗
// ╔══╣ ╠══╗
// ══╣ ╚════╝ ╠══(replay)>
// ╚══════════╝
我正在重播最后发出的元素,但我想要的是单独重播流。像这样:
// ╔════╗
// ╔══╣ ╠═(replay)═╗
// ══╣ ╚════╝ ╠══>
// ╚═════════(replay)═╝
因此,当我订阅整个流时,它会发出这两个内部流的最后一个元素。这能做到吗?
编辑:为了让我的问题更清楚,我将扩展我认为问题所在的位置。
我希望整个流成为可连接的可观察对象。这很重要,因为它位于无头片段中,并且不应该重新触发其所有轮换调用。同时我想单独重播这些流。在外部合并中移动 .replay(1).autoConnect()
是行不通的。整个流将在每次新订阅时重新创建。在外部合并中添加自定义运算符,例如:
private ObservableTransformer<Action, Result> actionsToResults =
actions -> actions.publish(shared -> Observable.merge(Observable.merge(
shared.ofType(SendMessageAction.class).compose(sendMessageTransformer),
shared.ofType(CheckMessageInputAction.class).compose(checkIfEmptyTransformer)).lift(new ReplayLastOperator<Result>())
model.messagesObservable.lift(new ReplayLastOperator<>()).observeOn(AndroidSchedulers.mainThread())
));
其中 ReplayLastOperator<Result>()
重播每个新订阅的最后一个元素等同于移动 replay(1).autoConnect()
,整个流在每个新订阅时重新创建。所以没有什么可以重播的。上面带有 .publish().autoConnect()
的解决方案,只会订阅一次内部流,所以我失去了检测上游新订阅的能力,因此重播也不起作用。
我 运行 没主意了。
是的。
您可以创建一个自定义运算符来缓存它看到的最后一个项目。
然后当从上游接收到 onComplete 时,它会重新发出它在发出 onComplete 本身之前看到的最后一个项目。该运算符的通用形式如下所示:
public class ReplayLast <T> implements ObservableOperator<T, T> {
@Override
public Observer<? super T> apply(@NonNull final Observer<? super T> observer) throws Exception {
return new Observer<T>(){
Disposable disposable;
T lastItem; // cache for last item
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
observer.onSubscribe(d);
}
@Override
public void onNext(@NonNull T t) {
lastItem = t; // cache each item from up stream
observer.onNext(t); // emit as usual
}
@Override
public void onError(@NonNull Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
if(lastItem != null){
// use the cache to emit the item again before signaling the complete downstream
observer.onNext(lastItem);
}
observer.onComplete();
}
};
}
那就只需要用在合适的地方就可以了。应该类似于以下内容:
ObservableTransformer<Action, Result> actionsToResults =
actions -> actions.publish(shared ->
Observable.merge(
Observable.merge(
shared.ofType(SendMessageAction.class).compose(sendMessageTransformer),
shared.ofType(CheckMessageInputAction.class).compose(checkIfEmptyTransformer)).lift(new ReplayLast()),
model.messagesObservable.lift(new ReplayLast())
.observeOn(AndroidSchedulers.mainThread())
)).autoConnect();
我针对我的特殊情况解决了这个问题。我将添加一些额外的信息,供您查看它是否适用于您的情况。
在我的例子中,我想要重放 return 不同类型的所有流,由抽象 class 密封。所以我对这个问题的解决方案是实现我自己的 ObservableTransoformer
,命名为 ReplayLastByClassType
,它兑现映射中按类型接收的最后一个元素。您只需调用 .compose(ReplayLastByClassType.<SealingClass>instance()
或 ReplayLastByClassType.instance()
.
转换器在订阅时重新发送兑现值。之后它继续在连续的 onNext
调用中传递元素。
- 警告 1. 在 onSubscribe 中编辑的元素顺序 return 不一定保留。(在我的情况下这不是必需的)。
- 警告 2. 即使转换后的可观察对象的所有订阅者取消订阅,上游可观察对象也不会被取消订阅!!!这是设计使然。
这种方法也适用于您,即使您要重播 return 个相同类型的对象也是如此。您必须将结果包装到单独的助手 classes 中以强制分隔 class.
终于定制了 ObservableTransformer
,复制粘贴快乐 :)
public final class ReplayLastByClassType<T> implements ObservableTransformer<T, T> {
private static final ReplayLastByClassType<Object> INSTANCE = new ReplayLastByClassType<>();
/** The singleton instance of this transformer **/
@SuppressWarnings("unchecked")
public static <T> ReplayLastByClassType<T> instance() {
return (ReplayLastByClassType<T>) INSTANCE;
}
private ReplayLastByClassType() {
}
@Override
public Observable<T> apply(Observable<T> upstream) {
LastSeen<T> lastSeen = new LastSeen<>();
return new LastSeenObservable<>(upstream.doOnNext(lastSeen).publish().autoConnect(), lastSeen);
}
static final class LastSeen<T> implements Consumer<T> {
ConcurrentHashMap<Class, T> values = new ConcurrentHashMap<>();
@Override public void accept(T latest) {
values.put(latest.getClass(), latest);
}
}
static final class LastSeenObservable<T> extends Observable<T> {
private final Observable<T> upstream;
private final LastSeen<T> lastSeen;
LastSeenObservable(Observable<T> upstream, LastSeen<T> lastSeen) {
this.upstream = upstream;
this.lastSeen = lastSeen;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
upstream.subscribe(new LastSeenObserver<T>(observer, lastSeen));
}
}
static final class LastSeenObserver<T> implements Observer<T> {
private final Observer<? super T> downstream;
private final LastSeen<T> lastSeen;
LastSeenObserver(Observer<? super T> downstream, LastSeen<T> lastSeen) {
this.downstream = downstream;
this.lastSeen = lastSeen;
}
@Override public void onSubscribe(Disposable d) {
downstream.onSubscribe(d);
Map<Class, T> values = lastSeen.values;
for (T t: values.values()) {
downstream.onNext(t);
}
}
@Override public void onNext(T value) {
downstream.onNext(value);
}
@Override public void onComplete() {
downstream.onComplete();
}
@Override public void onError(Throwable e) {
downstream.onError(e);
}
}
}
PS:如果您发现代码中有任何错误,请在评论中纠正我。