单独重放合并的流

Replaying merged streams individually

我有一个 ObservableTransformer,由几个合并在一起的子流组成,如下例所示。

ObservableTransformer<Action, Result> actionsToResults =
        actions -> actions.publish(shared ->
                Observable.merge(
                        Observable.merge(
                                shared.ofType(SendMessageAction.class).compose(sendMessageTransformer),
                                shared.ofType(CheckMessageInputAction.class).compose(checkIfEmptyTransformer)),
                        model.messagesObservable
                                .observeOn(AndroidSchedulers.mainThread())
                )).replay(1).autoConnect();

//      ╔════╗
//   ╔══╣    ╠══╗
// ══╣  ╚════╝  ╠══(replay)>
//   ╚══════════╝

我正在重播最后发出的元素,但我想要的是单独重播流。像这样:

//      ╔════╗
//   ╔══╣    ╠═(replay)═╗
// ══╣  ╚════╝          ╠══>
//   ╚═════════(replay)═╝

因此,当我订阅整个流时,它会发出这两个内部流的最后一个元素。这能做到吗?

编辑:为了让我的问题更清楚,我将扩展我认为问题所在的位置。

我希望整个流成为可连接的可观察对象。这很重要,因为它位于无头片段中,并且不应该重新触发其所有轮换调用。同时我想单独重播这些流。在外部合并中移动 .replay(1).autoConnect() 是行不通的。整个流将在每次新订阅时重新创建。在外部合并中添加自定义运算符,例如:

private ObservableTransformer<Action, Result> actionsToResults =
            actions -> actions.publish(shared -> Observable.merge(Observable.merge(
                    shared.ofType(SendMessageAction.class).compose(sendMessageTransformer),
                    shared.ofType(CheckMessageInputAction.class).compose(checkIfEmptyTransformer)).lift(new ReplayLastOperator<Result>())
                    model.messagesObservable.lift(new ReplayLastOperator<>()).observeOn(AndroidSchedulers.mainThread())
            ));

其中 ReplayLastOperator<Result>() 重播每个新订阅的最后一个元素等同于移动 replay(1).autoConnect(),整个流在每个新订阅时重新创建。所以没有什么可以重播的。上面带有 .publish().autoConnect() 的解决方案,只会订阅一次内部流,所以我失去了检测上游新订阅的能力,因此重播也不起作用。

我 运行 没主意了。

是的。
您可以创建一个自定义运算符来缓存它看到的最后一个项目。

然后当从上游接收到 onComplete 时,它​​会重新发出它在发出 onComplete 本身之前看到的最后一个项目。该运算符的通用形式如下所示:

public class ReplayLast <T>  implements ObservableOperator<T, T> {
@Override
public Observer<? super T> apply(@NonNull final Observer<? super T> observer) throws Exception {
    return new Observer<T>(){

        Disposable disposable;
        T lastItem;  // cache for last item

        @Override
        public void onSubscribe(@NonNull Disposable d) {
            disposable = d;
            observer.onSubscribe(d);
        }

        @Override
        public void onNext(@NonNull T t) {
            lastItem = t;  // cache each item from up stream
            observer.onNext(t); // emit as usual
        }

        @Override
        public void onError(@NonNull Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            if(lastItem != null){
                // use the cache to emit the item again before signaling the complete downstream
                observer.onNext(lastItem);
            }
            observer.onComplete();
        }
    };
}

那就只需要用在合适的地方就可以了。应该类似于以下内容:

ObservableTransformer<Action, Result> actionsToResults =
        actions -> actions.publish(shared ->
                Observable.merge(
                        Observable.merge(
                                shared.ofType(SendMessageAction.class).compose(sendMessageTransformer),
                                shared.ofType(CheckMessageInputAction.class).compose(checkIfEmptyTransformer)).lift(new ReplayLast()),
                        model.messagesObservable.lift(new ReplayLast())
                                .observeOn(AndroidSchedulers.mainThread())
                )).autoConnect();

我针对我的特殊情况解决了这个问题。我将添加一些额外的信息,供您查看它是否适用于您的情况。

在我的例子中,我想要重放 return 不同类型的所有流,由抽象 class 密封。所以我对这个问题的解决方案是实现我自己的 ObservableTransoformer,命名为 ReplayLastByClassType,它兑现映射中按类型接收的最后一个元素。您只需调用 .compose(ReplayLastByClassType.<SealingClass>instance()ReplayLastByClassType.instance().

即可应用它

转换器在订阅时重新发送兑现值。之后它继续在连续的 onNext 调用中传递元素。

  • 警告 1. 在 onSubscribe 中编辑的元素顺序 return 不一定保留。(在我的情况下这不是必需的)。
  • 警告 2. 即使转换后的可观察对象的所有订阅者取消订阅,上游可观察对象也不会被取消订阅!!!这是设计使然。

这种方法也适用于您,即使您要重播 return 个相同类型的对象也是如此。您必须将结果包装到单独的助手 classes 中以强制分隔 class.

终于定制了 ObservableTransformer,复制粘贴快乐 :)

public final class ReplayLastByClassType<T> implements ObservableTransformer<T, T> {

    private static final ReplayLastByClassType<Object> INSTANCE = new ReplayLastByClassType<>();

    /** The singleton instance of this transformer **/
    @SuppressWarnings("unchecked")
    public static <T> ReplayLastByClassType<T> instance() {
        return (ReplayLastByClassType<T>) INSTANCE;
    }

    private ReplayLastByClassType() {
    }

    @Override
    public Observable<T> apply(Observable<T> upstream) {
        LastSeen<T> lastSeen = new LastSeen<>();
        return new LastSeenObservable<>(upstream.doOnNext(lastSeen).publish().autoConnect(), lastSeen);
    }

    static final class LastSeen<T> implements Consumer<T> {
        ConcurrentHashMap<Class, T> values = new ConcurrentHashMap<>();

        @Override public void accept(T latest) {
            values.put(latest.getClass(), latest);
        }
    }

    static final class LastSeenObservable<T> extends Observable<T> {
        private final Observable<T> upstream;
        private final LastSeen<T> lastSeen;

        LastSeenObservable(Observable<T> upstream, LastSeen<T> lastSeen) {
            this.upstream = upstream;
            this.lastSeen = lastSeen;
        }

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            upstream.subscribe(new LastSeenObserver<T>(observer, lastSeen));
        }
    }

    static final class LastSeenObserver<T> implements Observer<T> {
        private final Observer<? super T> downstream;
        private final LastSeen<T> lastSeen;

        LastSeenObserver(Observer<? super T> downstream, LastSeen<T> lastSeen) {
            this.downstream = downstream;
            this.lastSeen = lastSeen;
        }

        @Override public void onSubscribe(Disposable d) {
            downstream.onSubscribe(d);

            Map<Class, T> values = lastSeen.values;
            for (T t: values.values()) {
                downstream.onNext(t);
            }
        }

        @Override public void onNext(T value) {
            downstream.onNext(value);
        }

        @Override public void onComplete() {
            downstream.onComplete();
        }

        @Override public void onError(Throwable e) {
            downstream.onError(e);
        }
    }
}

PS:如果您发现代码中有任何错误,请在评论中纠正我。