正确使用 RxJava 去除 Vert.x 回调
Correct use of RxJava to remove Vert.x callbacks
我们正在使用 Vert.x 和 RxJava。
我们的应用程序逻辑正在运行,但是,尽管使用了 RxJava,代码仍然包含嵌套多层的回调。
下面是一个例子。为了突出代码的结构,已删除所有日志记录和附加逻辑。
EventBus eb = vertx.eventBus();
// retrieve incoming messages from the consumer by listening on the event bus
MessageConsumer<BusMessage> consumer = eb.<BusMessage>consumer(address);
Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
consumerObservable.subscribe(cosumedMsg -> {
Object msg = cosumedMsg.body().encode());
Single<Message<MyObject>> step1 = eb.<MyObject>rxSend(addrs1, msg, deliveryOpts));
step1.subscribe(validationRes -> {
Single<Message<MyObject>> step2 = eb.rxSend(addrs2, msg, deliveryOpts));
step2.subscribe(handlerRes -> {
Single<Message<MyObject>> step3 = eb.rxSend(addrs3, msg, deliveryOpts));
step3.subscribe(dsRes -> {
Single<Message<MyObject>> step4 = eb.rxSend(addrs4, msg, deliveryOpts));
step4.subscribe(msg -> {
msg.body();
}, err -> {
// step1 error handling
});
}, err -> {
// step2 error handling
});
}, err -> {
// step3 error handling
});
}, err -> {
// step4 error handling
});
很可能我没有正确使用 RxJava。
有没有办法使用 RaxJava 的响应式功能重构示例代码以避免嵌套回调?
谢谢
Rxjava 提供了很多运算符来以函数式方式对您的逻辑进行建模,这些嵌套的回调可以使用运算符轻松地转换为逻辑链。您不需要在任何操作员那里得到错误,因为 Rx 已经将这些错误扔到流中,在一个地方,所以您可以相应地进行操作。错误处理是 Rx 中的一个大话题,但这是基本思想.. 接受每个回调
- 使用正确的运算符将每个逻辑步骤转换为 Rx 步骤。
- 处理每个回调错误并将它们放入流中。
在我看来,你的 Rx 链最后应该只有一个 subscribe()
。
您应该使用 flatMap()
链链接您的事件总线调用。类似的东西:
public void start() throws Exception {
EventBus eventBus = vertx.eventBus();
MessageConsumer<BusMessage> consumer = eventBus.consumer("my.address");
Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
consumerObservable
.flatMapSingle(consumerMsg -> {
final Object msg = consumerMsg.body().encode();
return eventBus.rxSend("addrs1", msg, new DeliveryOptions());
})
.flatMapSingle(step1Msg -> eventBus.rxSend("addrs2", step1Msg, new DeliveryOptions()))
.flatMapSingle(step2Msg -> eventBus.rxSend("addrs3", step2Msg, new DeliveryOptions()))
.subscribe(this::handleFinalMsg,
this::handleException,
this::handleOnComplete);
}
private void handleFinalMsg(final Message<Object> message) {
System.out.println(message.body());
}
private void handleException(final Throwable throwable) {
System.err.println(throwable);
}
private void handleOnComplete() {
System.out.println("it's the end!");
}
任何可能发生的异常都由 this::handleException
处理。
您可以查看此博客 post:https://streamdata.io/blog/vert-x-and-the-async-calls-chain/ (and its associated code) and may be at this code https://github.com/ctranxuan/howtos-vertx/tree/master/eventbus-rxjava 以获得可能的见解。
希望对您有所帮助。
我们正在使用 Vert.x 和 RxJava。 我们的应用程序逻辑正在运行,但是,尽管使用了 RxJava,代码仍然包含嵌套多层的回调。 下面是一个例子。为了突出代码的结构,已删除所有日志记录和附加逻辑。
EventBus eb = vertx.eventBus();
// retrieve incoming messages from the consumer by listening on the event bus
MessageConsumer<BusMessage> consumer = eb.<BusMessage>consumer(address);
Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
consumerObservable.subscribe(cosumedMsg -> {
Object msg = cosumedMsg.body().encode());
Single<Message<MyObject>> step1 = eb.<MyObject>rxSend(addrs1, msg, deliveryOpts));
step1.subscribe(validationRes -> {
Single<Message<MyObject>> step2 = eb.rxSend(addrs2, msg, deliveryOpts));
step2.subscribe(handlerRes -> {
Single<Message<MyObject>> step3 = eb.rxSend(addrs3, msg, deliveryOpts));
step3.subscribe(dsRes -> {
Single<Message<MyObject>> step4 = eb.rxSend(addrs4, msg, deliveryOpts));
step4.subscribe(msg -> {
msg.body();
}, err -> {
// step1 error handling
});
}, err -> {
// step2 error handling
});
}, err -> {
// step3 error handling
});
}, err -> {
// step4 error handling
});
很可能我没有正确使用 RxJava。 有没有办法使用 RaxJava 的响应式功能重构示例代码以避免嵌套回调?
谢谢
Rxjava 提供了很多运算符来以函数式方式对您的逻辑进行建模,这些嵌套的回调可以使用运算符轻松地转换为逻辑链。您不需要在任何操作员那里得到错误,因为 Rx 已经将这些错误扔到流中,在一个地方,所以您可以相应地进行操作。错误处理是 Rx 中的一个大话题,但这是基本思想.. 接受每个回调
- 使用正确的运算符将每个逻辑步骤转换为 Rx 步骤。
- 处理每个回调错误并将它们放入流中。
在我看来,你的 Rx 链最后应该只有一个 subscribe()
。
您应该使用 flatMap()
链链接您的事件总线调用。类似的东西:
public void start() throws Exception {
EventBus eventBus = vertx.eventBus();
MessageConsumer<BusMessage> consumer = eventBus.consumer("my.address");
Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
consumerObservable
.flatMapSingle(consumerMsg -> {
final Object msg = consumerMsg.body().encode();
return eventBus.rxSend("addrs1", msg, new DeliveryOptions());
})
.flatMapSingle(step1Msg -> eventBus.rxSend("addrs2", step1Msg, new DeliveryOptions()))
.flatMapSingle(step2Msg -> eventBus.rxSend("addrs3", step2Msg, new DeliveryOptions()))
.subscribe(this::handleFinalMsg,
this::handleException,
this::handleOnComplete);
}
private void handleFinalMsg(final Message<Object> message) {
System.out.println(message.body());
}
private void handleException(final Throwable throwable) {
System.err.println(throwable);
}
private void handleOnComplete() {
System.out.println("it's the end!");
}
任何可能发生的异常都由 this::handleException
处理。
您可以查看此博客 post:https://streamdata.io/blog/vert-x-and-the-async-calls-chain/ (and its associated code) and may be at this code https://github.com/ctranxuan/howtos-vertx/tree/master/eventbus-rxjava 以获得可能的见解。
希望对您有所帮助。