RxJava 使一个观察者失败
RxJava failing one observer from another
假设出现以下情况,问题的 best/advised 解决方案是什么?
我有两个流,一个代表 TCP 连接,另一个代表该 TCP 连接的状态。
一旦状态改变(即断开连接),我想重新获取TCP连接。
我最初的想法是拥有这 2 个流,合并它们并在结果 Observable 上应用 retryWith。第二个流是 PublishSubject 的一个实例,它为我提供了一种非常方便的失败方法。现在,这个想法部分可行,除了当我在发布者上调用 onError() 时,连接流 (#1) 保持 subscribing/unsubscribing 直到用完 retryWhen 设置的限制。
我确定这个问题过去一定已经解决了,您希望保持 TCP 连接,运行,我只是不确定如何从这里取得进展。任何帮助将不胜感激。
您不需要两个流。就用一个吧。典型的构造将涉及 Observable.using()
创建套接字,在该套接字上建立可观察对象并处理关闭套接字,然后将其与 retry()
链接(通常有延迟)。
return Observable.using(new Func0<XMPPTCPConnection>() {
@Override
public XMPPTCPConnection call() {
L.log("creating connection");
connection = _createConnection();
return connection;
}
}, new Func1<XMPPTCPConnection, Observable<?>>() {
@Override
public Observable<?> call(final XMPPTCPConnection connection) {
try {
_authenticate(connection, username, password);
} catch (Exception e) {
throw new RuntimeException(e);
}
return Observable.just(connection)
.repeatWhen(new RepeatWhenOperator(-1, 1000))
.map(new Func1<XMPPTCPConnection, Object>() {
@Override
public Object call(XMPPTCPConnection connection) {
if (!connection.isConnected()) {
throw new RuntimeException("Disconnected");
}
return null;
}
});
}
}, new Action1<XMPPTCPConnection>() {
@Override
public void call(XMPPTCPConnection connection) {
L.log("disposing");
_disconnect();
}
})
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.retry();
好的,基于 Observable.using() 我想出了这个解决方案。它有效,但我对解决方案不满意 - 即每秒轮询连接是否仍然存在。 XMPPTCPConnection 为我提供了连接断开时的侦听器,这似乎是一个更好的解决方案 - 只是不确定如何合并...
_disconnect()方法其实就是断开连接dispose的意思;在当前情况下,它可能是一个糟糕的名称,但导致抛出 RuntimeException 的断开连接发生在该系统之外。
如有任何想法或改进,我们将不胜感激!
假设出现以下情况,问题的 best/advised 解决方案是什么?
我有两个流,一个代表 TCP 连接,另一个代表该 TCP 连接的状态。 一旦状态改变(即断开连接),我想重新获取TCP连接。
我最初的想法是拥有这 2 个流,合并它们并在结果 Observable 上应用 retryWith。第二个流是 PublishSubject 的一个实例,它为我提供了一种非常方便的失败方法。现在,这个想法部分可行,除了当我在发布者上调用 onError() 时,连接流 (#1) 保持 subscribing/unsubscribing 直到用完 retryWhen 设置的限制。
我确定这个问题过去一定已经解决了,您希望保持 TCP 连接,运行,我只是不确定如何从这里取得进展。任何帮助将不胜感激。
您不需要两个流。就用一个吧。典型的构造将涉及 Observable.using()
创建套接字,在该套接字上建立可观察对象并处理关闭套接字,然后将其与 retry()
链接(通常有延迟)。
return Observable.using(new Func0<XMPPTCPConnection>() {
@Override
public XMPPTCPConnection call() {
L.log("creating connection");
connection = _createConnection();
return connection;
}
}, new Func1<XMPPTCPConnection, Observable<?>>() {
@Override
public Observable<?> call(final XMPPTCPConnection connection) {
try {
_authenticate(connection, username, password);
} catch (Exception e) {
throw new RuntimeException(e);
}
return Observable.just(connection)
.repeatWhen(new RepeatWhenOperator(-1, 1000))
.map(new Func1<XMPPTCPConnection, Object>() {
@Override
public Object call(XMPPTCPConnection connection) {
if (!connection.isConnected()) {
throw new RuntimeException("Disconnected");
}
return null;
}
});
}
}, new Action1<XMPPTCPConnection>() {
@Override
public void call(XMPPTCPConnection connection) {
L.log("disposing");
_disconnect();
}
})
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.retry();
好的,基于 Observable.using() 我想出了这个解决方案。它有效,但我对解决方案不满意 - 即每秒轮询连接是否仍然存在。 XMPPTCPConnection 为我提供了连接断开时的侦听器,这似乎是一个更好的解决方案 - 只是不确定如何合并...
_disconnect()方法其实就是断开连接dispose的意思;在当前情况下,它可能是一个糟糕的名称,但导致抛出 RuntimeException 的断开连接发生在该系统之外。
如有任何想法或改进,我们将不胜感激!