Vertx Rx-Java: eventBus订阅者被取消订阅的原因
Vertx Rx-Java: reasons for eventBus subscriber being unsubscribed
我正在使用 vertx 和 rx-java。
我有一个 Verticle,它订阅了具有特定地址的 eventBus 上的事件:
eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
...
message.reply(...);
})
... same for other addresses...
其他 Verticle 正在使用以下方式发送事件:
eventBus.rxSend(some_address, message, new DeployOptions().setSendTimeout(60000));
Verticles 是通过 RxHelper.deployVerticle
.
创建的
一切正常,但是过了一段时间后,其中一个地址被取消订阅,并且此事件的所有请求现在都失败并出现 ReplyException: No handlers for address some_ddress
错误,所有其他地址仍然被订阅。
我在日志中没有看到任何顶点错误。
消费者从它正在监听的特定地址自动取消订阅的原因是什么?
据我所知:如果请求因错误或超时而失败,它不应该导致取消订阅,所以我真的不明白什么会导致这种行为。(我没有任何明确的unsubscribe
个电话)
看来问题是message.reply
之前的代码有时会抛出异常:
eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
... <-- exception here
message.reply(...);
})
简单修复:
eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
try {
... <-- exception here
message.reply(...);
} catch (Exception e) {
...handle exception...
message.error(...);
}
})
我正在使用 vertx 和 rx-java。
我有一个 Verticle,它订阅了具有特定地址的 eventBus 上的事件:
eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
...
message.reply(...);
})
... same for other addresses...
其他 Verticle 正在使用以下方式发送事件:
eventBus.rxSend(some_address, message, new DeployOptions().setSendTimeout(60000));
Verticles 是通过 RxHelper.deployVerticle
.
一切正常,但是过了一段时间后,其中一个地址被取消订阅,并且此事件的所有请求现在都失败并出现 ReplyException: No handlers for address some_ddress
错误,所有其他地址仍然被订阅。
我在日志中没有看到任何顶点错误。
消费者从它正在监听的特定地址自动取消订阅的原因是什么?
据我所知:如果请求因错误或超时而失败,它不应该导致取消订阅,所以我真的不明白什么会导致这种行为。(我没有任何明确的unsubscribe
个电话)
看来问题是message.reply
之前的代码有时会抛出异常:
eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
... <-- exception here
message.reply(...);
})
简单修复:
eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
try {
... <-- exception here
message.reply(...);
} catch (Exception e) {
...handle exception...
message.error(...);
}
})