当部署为单独的 fat-jars 时,Vertx 服务在本地 JVM 上通过有限数据集 运行 时不连续接受消息
Vertx services not accepting messages continuously when running on local JVM over a finite set of data when deployed as separate fat-jars
我开始使用 vertx 并尝试在事件总线上进行点对点消息传递。我有 2 个服务,它们都创建为单独的 Maven 项目并部署为 fat-jars
1) 从文件中读取内容并将内容作为消息通过地址发送 - ContentParserService.java
2) 阅读消息并回复收到的消息- PingService.java
这两个服务都部署为独立的 jar 类型的微服务方式
代码如下:ContentParserService.java
@Override
public void start(Future<Void> startFuture) throws Exception {
super.start(startFuture);
// Reference to the eventbus running on JVM
EventBus eventBus = vertx.eventBus();
// Read file using normal java mechanism
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(ClassLoader.getSystemResourceAsStream(
(config().getString("filename")))));
bufferedReader.readLine(); //read first line
String line = null;
while ((line = bufferedReader.readLine()) != null) {
String[] data = line.split(",");
// Create RealEstate Object
RealEstateTransaction realEstateData = createTransactionObject(data);
// Construct Message JSON
JsonObject messageJSON = constructMessageJson(realEstateData);
// Send message to PING address over the Event Bus
eventBus.send("PING", Json.encode(messageJSON), reply -> {
if (reply.succeeded())
System.out.println("Received Reply: " + reply.result().body());
else {
System.out.println("No reply");
}
});
}
} catch (IOException e) {
startFuture.fail(e.getMessage());
}
代码如下:PingService.java
@Override
public void start(Future<Void> startFuture) throws Exception {
super.start(startFuture);
System.out.println("Referencing event bus");
// Reference to the event bus running on the JVM
EventBus eventBus = vertx.eventBus();
System.out.println("Creating HttpServer");
// Create HTTP Server to handle incoming requests
HttpServer httpServer = vertx.createHttpServer();
System.out.println("Creating Router");
// Create Router for routing to appropriate endpoint
Router router = Router.router(vertx);
System.out.println("Starting to consume message sent over event bus");
// Consume the incoming message over the address PING
eventBus.consumer("PING", event -> {
System.out.println("Received message: " + event.body());
event.reply("Received at PING address");
});
System.out.println("Receiver ready and receiving messages");
当我 运行 这两个服务时,我 运行 在同一台机器上为每个服务使用 java -jar 命令。我观察到的是,当我部署 ContentParserService 的第一个 jar 时,它立即启动并通过事件总线发送消息,但是当我启动 pingservice jar 时,它无法接收通过事件总线发送的任何消息,因为我的 pingService本身是一个单独的 fatjar 和微服务。我正在阅读的文件是一个包含大约 200 个条目的有限长度 csv 文件。如果我将这两种服务捆绑在一个 fat jar 中,这种情况就会起作用。
在我的案例中,我应该如何实现能够相互发送消息的不同 fat jars 服务。
当两个 Verticle 都在同一个 jar 中时,这种情况才有效,因为没有网络延迟。但是您的 EventBus 用例不正确,因为它不会保留消息,因此无法重播它们。您应该仅在对方准备好接收消息时才开始发送消息。
您需要扭转依赖关系。在你的 ContentParserService
中注册一些 "ready" 事件,然后只有在你得到它时才开始你的 while
循环:
vertx.eventBus().consumer("ready", (message) -> {
while ((line = bufferedReader.readLine()) != null) {
...
}
});
现在,如果 ContentParserService
实际上变慢并且错过了 "ready" 事件会怎样?为此使用 vertx.setPeriodic()
。因此,您启动 PingService
,并定期通知 ContentParserService
您已准备好接收一些消息。
或者,作为一种选择,根本不在服务之间使用 EventBus,而是切换到具有持久性的东西,例如 RabbitMQ 或 Kafka。
我开始使用 vertx 并尝试在事件总线上进行点对点消息传递。我有 2 个服务,它们都创建为单独的 Maven 项目并部署为 fat-jars
1) 从文件中读取内容并将内容作为消息通过地址发送 - ContentParserService.java
2) 阅读消息并回复收到的消息- PingService.java
这两个服务都部署为独立的 jar 类型的微服务方式
代码如下:ContentParserService.java
@Override
public void start(Future<Void> startFuture) throws Exception {
super.start(startFuture);
// Reference to the eventbus running on JVM
EventBus eventBus = vertx.eventBus();
// Read file using normal java mechanism
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(ClassLoader.getSystemResourceAsStream(
(config().getString("filename")))));
bufferedReader.readLine(); //read first line
String line = null;
while ((line = bufferedReader.readLine()) != null) {
String[] data = line.split(",");
// Create RealEstate Object
RealEstateTransaction realEstateData = createTransactionObject(data);
// Construct Message JSON
JsonObject messageJSON = constructMessageJson(realEstateData);
// Send message to PING address over the Event Bus
eventBus.send("PING", Json.encode(messageJSON), reply -> {
if (reply.succeeded())
System.out.println("Received Reply: " + reply.result().body());
else {
System.out.println("No reply");
}
});
}
} catch (IOException e) {
startFuture.fail(e.getMessage());
}
代码如下:PingService.java
@Override
public void start(Future<Void> startFuture) throws Exception {
super.start(startFuture);
System.out.println("Referencing event bus");
// Reference to the event bus running on the JVM
EventBus eventBus = vertx.eventBus();
System.out.println("Creating HttpServer");
// Create HTTP Server to handle incoming requests
HttpServer httpServer = vertx.createHttpServer();
System.out.println("Creating Router");
// Create Router for routing to appropriate endpoint
Router router = Router.router(vertx);
System.out.println("Starting to consume message sent over event bus");
// Consume the incoming message over the address PING
eventBus.consumer("PING", event -> {
System.out.println("Received message: " + event.body());
event.reply("Received at PING address");
});
System.out.println("Receiver ready and receiving messages");
当我 运行 这两个服务时,我 运行 在同一台机器上为每个服务使用 java -jar 命令。我观察到的是,当我部署 ContentParserService 的第一个 jar 时,它立即启动并通过事件总线发送消息,但是当我启动 pingservice jar 时,它无法接收通过事件总线发送的任何消息,因为我的 pingService本身是一个单独的 fatjar 和微服务。我正在阅读的文件是一个包含大约 200 个条目的有限长度 csv 文件。如果我将这两种服务捆绑在一个 fat jar 中,这种情况就会起作用。
在我的案例中,我应该如何实现能够相互发送消息的不同 fat jars 服务。
当两个 Verticle 都在同一个 jar 中时,这种情况才有效,因为没有网络延迟。但是您的 EventBus 用例不正确,因为它不会保留消息,因此无法重播它们。您应该仅在对方准备好接收消息时才开始发送消息。
您需要扭转依赖关系。在你的 ContentParserService
中注册一些 "ready" 事件,然后只有在你得到它时才开始你的 while
循环:
vertx.eventBus().consumer("ready", (message) -> {
while ((line = bufferedReader.readLine()) != null) {
...
}
});
现在,如果 ContentParserService
实际上变慢并且错过了 "ready" 事件会怎样?为此使用 vertx.setPeriodic()
。因此,您启动 PingService
,并定期通知 ContentParserService
您已准备好接收一些消息。
或者,作为一种选择,根本不在服务之间使用 EventBus,而是切换到具有持久性的东西,例如 RabbitMQ 或 Kafka。