Vertx,带有阻塞回复的事件总线
Vertx, event bus with blocking reply
我有两个 Verticle,
一个发送一些数据到事件总线,一个从事件总线接收数据然后在睡眠后回复
我的代码是:
发件人:
public class Sender extends AbstractVerticle{
@Override
public void start() throws Exception {
final EventBus eventBus = this.vertx.eventBus();
this.vertx.setPeriodic(1000, handler->{
eventBus.<JsonObject>send("test", new JsonObject().put("d", "d"),this::handle);
});
}
private void handle(AsyncResult<Message<JsonObject>> result) {
if(result.succeeded()){
System.out.println("Answer: "+Thread.currentThread().getName());
System.out.println(result.result().body());
} else{
result.cause().printStackTrace();
}
}
}
接收者:
public class Receiver extends AbstractVerticle {
@Override
public void start() throws Exception {
final EventBus eventBus = this.vertx.eventBus();
eventBus.<JsonObject>consumer("test",this::handle);
}
private void handle(final Message<JsonObject> event) {
System.out.println("Get: "+event.body());
sleep();
System.out.println("Response: ");
event.reply("Hello");
}
private void sleep(){
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
Logger.getLogger(Receiver.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
我运行两个verticle分别使用集群模式
这是接收器 Verticle 的输出:
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
但是 sender verticle 的输出是空的然后它显示错误
Timed out after waiting 30000(ms) for a reply. address: 26d073b1-e97a-4d9b-88b7-0b9fbe61cb25, repliedAddress: test
我知道在 30 秒内没有结果时会发生这种情况,但为什么当我的睡眠只有 2 秒时会发生这种情况。
第二个问题,也许我错过了官方文档中的一些内容,但是当所有回调的代码都使用事件循环线程时,Vertx 是如何异步的,为了避免这种行为,我们应该为每个长任务创建工作线程?
提前致谢
您提供的代码示例在我的电脑上运行正常 (java 9 + vertx 3.5.1)。
但是,它有一个问题 - 在 Sender
verticle 处理程序方法中具有以下签名 - handle(AsyncResult<Message<JsonObject>> result)
。这意味着它是针对 JsonObject 的回复实例,但是在 Receiver
verticle 中,您使用 event.reply("Hello")
回复,这是一个简单的 sting。因此,您将 Sender
verticle 中的处理程序更新为:
JsonObject body = result.result().body();
System.out.println(body);
会有异常:
java.lang.ClassCastException: java.base/java.lang.String cannot be cast to io.vertx.core.json.JsonObject
因此,如果您从 Receiver
verticle 回复 JsonObject
,或将 Sender
中的回复处理程序更改为使用 String
- 它会正常工作。
我有两个 Verticle, 一个发送一些数据到事件总线,一个从事件总线接收数据然后在睡眠后回复
我的代码是:
发件人:
public class Sender extends AbstractVerticle{
@Override
public void start() throws Exception {
final EventBus eventBus = this.vertx.eventBus();
this.vertx.setPeriodic(1000, handler->{
eventBus.<JsonObject>send("test", new JsonObject().put("d", "d"),this::handle);
});
}
private void handle(AsyncResult<Message<JsonObject>> result) {
if(result.succeeded()){
System.out.println("Answer: "+Thread.currentThread().getName());
System.out.println(result.result().body());
} else{
result.cause().printStackTrace();
}
}
}
接收者:
public class Receiver extends AbstractVerticle {
@Override
public void start() throws Exception {
final EventBus eventBus = this.vertx.eventBus();
eventBus.<JsonObject>consumer("test",this::handle);
}
private void handle(final Message<JsonObject> event) {
System.out.println("Get: "+event.body());
sleep();
System.out.println("Response: ");
event.reply("Hello");
}
private void sleep(){
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
Logger.getLogger(Receiver.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
我运行两个verticle分别使用集群模式
这是接收器 Verticle 的输出:
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
vert.x-eventloop-thread-1
Response:
Get: {"d":"d"}
但是 sender verticle 的输出是空的然后它显示错误
Timed out after waiting 30000(ms) for a reply. address: 26d073b1-e97a-4d9b-88b7-0b9fbe61cb25, repliedAddress: test
我知道在 30 秒内没有结果时会发生这种情况,但为什么当我的睡眠只有 2 秒时会发生这种情况。
第二个问题,也许我错过了官方文档中的一些内容,但是当所有回调的代码都使用事件循环线程时,Vertx 是如何异步的,为了避免这种行为,我们应该为每个长任务创建工作线程?
提前致谢
您提供的代码示例在我的电脑上运行正常 (java 9 + vertx 3.5.1)。
但是,它有一个问题 - 在 Sender
verticle 处理程序方法中具有以下签名 - handle(AsyncResult<Message<JsonObject>> result)
。这意味着它是针对 JsonObject 的回复实例,但是在 Receiver
verticle 中,您使用 event.reply("Hello")
回复,这是一个简单的 sting。因此,您将 Sender
verticle 中的处理程序更新为:
JsonObject body = result.result().body();
System.out.println(body);
会有异常:
java.lang.ClassCastException: java.base/java.lang.String cannot be cast to io.vertx.core.json.JsonObject
因此,如果您从 Receiver
verticle 回复 JsonObject
,或将 Sender
中的回复处理程序更改为使用 String
- 它会正常工作。