RxJava 使一个观察者失败

RxJava failing one observer from another

假设出现以下情况,问题的 best/advised 解决方案是什么?

我有两个流,一个代表 TCP 连接,另一个代表该 TCP 连接的状态。 一旦状态改变(即断开连接),我想重新获取TCP连接。

我最初的想法是拥有这 2 个流,合并它们并在结果 Observable 上应用 retryWith。第二个流是 PublishSubject 的一个实例,它为我提供了一种非常方便的失败方法。现在,这个想法部分可行,除了当我在发布者上调用 onError() 时,连接流 (#1) 保持 subscribing/unsubscribing 直到用完 retryWhen 设置的限制。

我确定这个问题过去一定已经解决了,您希望保持 TCP 连接,运行,我只是不确定如何从这里取得进展。任何帮助将不胜感激。

您不需要两个流。就用一个吧。典型的构造将涉及 Observable.using() 创建套接字,在该套接字上建立可观察对象并处理关闭套接字,然后将其与 retry() 链接(通常有延迟)。

 return Observable.using(new Func0<XMPPTCPConnection>() {
        @Override
        public XMPPTCPConnection call() {
            L.log("creating connection");
            connection = _createConnection();
            return connection;
        }
    }, new Func1<XMPPTCPConnection, Observable<?>>() {
        @Override
        public Observable<?> call(final XMPPTCPConnection connection) {

            try {
                _authenticate(connection, username, password);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }

            return Observable.just(connection)
                    .repeatWhen(new RepeatWhenOperator(-1, 1000))
                    .map(new Func1<XMPPTCPConnection, Object>() {
                        @Override
                        public Object call(XMPPTCPConnection connection) {
                            if (!connection.isConnected()) {
                                throw new RuntimeException("Disconnected");
                            }

                            return null;
                        }
                    });
        }
    }, new Action1<XMPPTCPConnection>() {
        @Override
        public void call(XMPPTCPConnection connection) {
            L.log("disposing");
            _disconnect();
        }
    })
            .subscribeOn(Schedulers.io())
            .unsubscribeOn(Schedulers.io())
            .retry();

好的,基于 Observable.using() 我想出了这个解决方案。它有效,但我对解决方案不满意 - 即每秒轮询连接是否仍然存在。 XMPPTCPConnection 为我提供了连接断开时的侦听器,这似乎是一个更好的解决方案 - 只是不确定如何合并...

_disconnect()方法其实就是断开连接dispose的意思;在当前情况下,它可能是一个糟糕的名称,但导致抛出 RuntimeException 的断开连接发生在该系统之外。

如有任何想法或改进,我们将不胜感激!