Vertx,带有阻塞回复的事件总线

Vertx, event bus with blocking reply

我有两个 Verticle, 一个发送一些数据到事件总线,一个从事件总线接收数据然后在睡眠后回复

我的代码是:

发件人:

public class Sender extends AbstractVerticle{

    @Override
    public void start() throws Exception {
        final EventBus eventBus = this.vertx.eventBus();
        this.vertx.setPeriodic(1000, handler->{
            eventBus.<JsonObject>send("test", new JsonObject().put("d", "d"),this::handle);
        });
    }
    private  void handle(AsyncResult<Message<JsonObject>> result) {
        if(result.succeeded()){
            System.out.println("Answer: "+Thread.currentThread().getName());
            System.out.println(result.result().body());
        } else{
            result.cause().printStackTrace();
        }
    }
}

接收者:

public class Receiver extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        final EventBus eventBus = this.vertx.eventBus();
        eventBus.<JsonObject>consumer("test",this::handle);
    }

    private void handle(final Message<JsonObject> event) {
        System.out.println("Get: "+event.body());
        sleep();
        System.out.println("Response: ");
        event.reply("Hello");
    }


    private void sleep(){
        System.out.println(Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException ex) {
            Logger.getLogger(Receiver.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

我运行两个verticle分别使用集群模式

这是接收器 Verticle 的输出:

Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}

但是 sender verticle 的输出是空的然后它显示错误

 Timed out after waiting 30000(ms) for a reply. address: 26d073b1-e97a-4d9b-88b7-0b9fbe61cb25, repliedAddress: test

我知道在 30 秒内没有结果时会发生这种情况,但为什么当我的睡眠只有 2 秒时会发生这种情况。

第二个问题,也许我错过了官方文档中的一些内容,但是当所有回调的代码都使用事件循环线程时,Vertx 是如何异步的,为了避免这种行为,我们应该为每个长任务创建工作线程?

提前致谢

您提供的代码示例在我的电脑上运行正常 (java 9 + vertx 3.5.1)。 但是,它有一个问题 - 在 Sender verticle 处理程序方法中具有以下签名 - handle(AsyncResult<Message<JsonObject>> result)。这意味着它是针对 JsonObject 的回复实例,但是在 Receiver verticle 中,您使用 event.reply("Hello") 回复,这是一个简单的 sting。因此,您将 Sender verticle 中的处理程序更新为:

JsonObject body = result.result().body();
System.out.println(body);

会有异常:

java.lang.ClassCastException: java.base/java.lang.String cannot be cast to io.vertx.core.json.JsonObject

因此,如果您从 Receiver verticle 回复 JsonObject,或将 Sender 中的回复处理程序更改为使用 String - 它会正常工作。