正确使用 RxJava 去除 Vert.x 回调

Correct use of RxJava to remove Vert.x callbacks

我们正在使用 Vert.xRxJava。 我们的应用程序逻辑正在运行,但是,尽管使用了 RxJava,代码仍然包含嵌套多层的回调。 下面是一个例子。为了突出代码的结构,已删除所有日志记录和附加逻辑。

EventBus eb = vertx.eventBus();
// retrieve incoming messages from the consumer by listening on the event bus
MessageConsumer<BusMessage> consumer = eb.<BusMessage>consumer(address);
Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
consumerObservable.subscribe(cosumedMsg -> {
  Object msg = cosumedMsg.body().encode());
  Single<Message<MyObject>> step1 = eb.<MyObject>rxSend(addrs1, msg, deliveryOpts));
  step1.subscribe(validationRes -> {
    Single<Message<MyObject>> step2 = eb.rxSend(addrs2, msg, deliveryOpts));
    step2.subscribe(handlerRes -> {
      Single<Message<MyObject>> step3 = eb.rxSend(addrs3, msg, deliveryOpts));
        step3.subscribe(dsRes -> {
          Single<Message<MyObject>> step4 = eb.rxSend(addrs4, msg, deliveryOpts));
            step4.subscribe(msg -> {
              msg.body();
            }, err -> {
            // step1 error handling
          });
        }, err -> {
        // step2 error handling
      });
    }, err -> {
    //  step3 error handling
  });
 }, err -> {
 // step4 error handling
});

很可能我没有正确使用 RxJava。 有没有办法使用 RaxJava 的响应式功能重构示例代码以避免嵌套回调?

谢谢

Rxjava 提供了很多运算符来以函数式方式对您的逻辑进行建模,这些嵌套的回调可以使用运算符轻松地转换为逻辑链。您不需要在任何操作员那里得到错误,因为 Rx 已经将这些错误扔到流中,在一个地方,所以您可以相应地进行操作。错误处理是 Rx 中的一个大话题,但这是基本思想.. 接受每个回调

  • 使用正确的运算符将每个逻辑步骤转换为 Rx 步骤。
  • 处理每个回调错误并将它们放入流中。

在我看来,你的 Rx 链最后应该只有一个 subscribe()。 您应该使用 flatMap() 链链接您的事件总线调用。类似的东西:

    public void start() throws Exception {
        EventBus eventBus = vertx.eventBus();
        MessageConsumer<BusMessage> consumer = eventBus.consumer("my.address");
        Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
        consumerObservable
                .flatMapSingle(consumerMsg -> {
                    final Object msg = consumerMsg.body().encode();
                    return eventBus.rxSend("addrs1", msg, new DeliveryOptions());
                })
                .flatMapSingle(step1Msg -> eventBus.rxSend("addrs2", step1Msg, new DeliveryOptions()))
                .flatMapSingle(step2Msg -> eventBus.rxSend("addrs3", step2Msg, new DeliveryOptions()))
                .subscribe(this::handleFinalMsg,
                           this::handleException,
                           this::handleOnComplete);
    }

    private void handleFinalMsg(final Message<Object> message) {
        System.out.println(message.body());
    }

    private void handleException(final Throwable throwable) {
        System.err.println(throwable);
    }

    private void handleOnComplete() {
        System.out.println("it's the end!");
    }

任何可能发生的异常都由 this::handleException 处理。

您可以查看此博客 post:https://streamdata.io/blog/vert-x-and-the-async-calls-chain/ (and its associated code) and may be at this code https://github.com/ctranxuan/howtos-vertx/tree/master/eventbus-rxjava 以获得可能的见解。

希望对您有所帮助。