vertx executeBlocking 不同的行为
vertx executeBlocking different behaviour
我正在试验 vertx executeBlocking,以模拟我遵循的实时场景
vertx.setPeriodic(1000, id ->{
counter += 1;
LOGGER.info("invoked method {} ",counter);
vertx.executeBlocking(future -> {
int counterFinal = counter;
String result = service.blockingMethod("cycle "+counterFinal+" executed");
future.complete(result);
}, res -> {
LOGGER.info(String.format("The result is: %s", res.result()));
});
阻塞方法非常简单
public String blockingMethod(String result){
block(2);
return result;
}
这就是结果
07:50:27.742 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 1
07:50:28.742 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 2
07:50:29.740 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 3
07:50:29.764 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - The result is: cycle 1 executed
07:50:30.739 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 4
07:50:31.739 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 5
07:50:31.773 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - The result is: cycle 3 executed
07:50:32.751 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 6
07:50:33.748 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 7
07:50:33.789 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - The result is: cycle 5 executed
很明显平均漏掉了两个事件,因为延迟设置为2秒。
然后我将阻塞方法包装在 class 中,然后按以下方式执行
vertx.setPeriodic(1000, id ->{
counter++;
LOGGER.info("invoked method {} ",counter);
service.wrapperMethod("Hello", counter, new Handler<AsyncClass>() {
@Override
public void handle(AsyncClass event) {
vertx.executeBlocking(future -> {
String result = event.result();
future.complete(result);
}, res -> {
LOGGER.info(String.format("The result is: %s", res.result()));
});
}
});
});
wrapper Method就是这样设计的
public void wrapperMethod(String input, int cycle, Handler<AsyncClass> execute) {
AsyncClass instance = new AsyncClass(input,String.valueOf(cycle)); // my custom class where the result method has a 2 sec delay
execute.handle(instance);
}
然后我得到了预期的结果。
08:08:27.358 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 1
08:08:27.368 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:28.338 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 2
08:08:29.345 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 3
08:08:29.384 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:29.386 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - The result is: Hello world of cycle 1
08:08:30.347 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 4
08:08:31.351 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 5
08:08:31.391 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:31.391 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - The result is: Hello world of cycle 2
08:08:32.341 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 6
08:08:33.343 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 7
08:08:33.396 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:33.397 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - The result is: Hello world of cycle 3
现在我看到了异步执行,没有遗漏任何一个事件。我找不到可能的解释。
即使在包装器方法中,如果我延迟 n 秒,它也会按预期错过事件。
有人请帮助我理解这种行为。
更新1:
对于第二种情况,AsyncClass
的结构如下
public class AsyncClass {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncClass.class);
private String input;
private String cycle;
public AsyncClass(String input, String cycle) {
this.input = input;
this.cycle = cycle;
}
public String result(){
LOGGER.info("Invoking method inside AsyncClass class");
block(2);
return input+" world of cycle "+cycle;
}
private void block(int pauseLimitInSecond){
try {
TimeUnit.SECONDS.sleep(pauseLimitInSecond);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error("exception - > ", e);
}
}
}
您使用的 executeBlocking
method 确保阻塞任务按顺序执行(一个接一个)。
所以当第二个阻塞任务执行时(两秒后),counter
变量已经递增了两次。因此 1
、3
、5
系列。
在您使用包装 class 的另一次尝试中,counter
变量值在调用 executeBlocking
之前被捕获。所以当执行阻塞任务时,你得到了你期望的值。
观察到的行为是由于 Lambda body variable capture 捕获了外部 class 引用 (this
)。
外部 class 实例变量 counter
将在计算 Lambda 主体 表达式时更改其值(增加 1
超过初始值)这给人一种意外行为的错觉。
保持相同的程序顺序,您可以将 Lambda 表达式 主体替换为 Handler<Future<String>>
实现,其中 counter
外部实例变量存储在另一个将在处理程序执行体中使用的实例变量:
private static final class BlockingHandler implements Handler<Future<String>> {
private final YourBlockingService service;
private final int counter;
public BlockingHandler(int counter, YourBlockingService service) {
this.counter = counter;
this.service = service;
}
@Override
public void handle(Future<String> event) {
String result = service.blockingMethod("cycle " + this.counter + " executed", 2);
event.complete(result);
}
}
您的 Verticle 代码将如下所示:
this.vertx.setPeriodic(
1000,
id -> {
counter += 1;
LOGGER.info("invoked method {}", counter);
vertx.executeBlocking(
new YourBlockingHandler(this.counter, this.service),
res -> LOGGER.info(String.format("The result is: %s", res.result()))
);
}
);
恢复,上述行为仅与闭包语义相关,与Vert.x内部无关。
我正在试验 vertx executeBlocking,以模拟我遵循的实时场景
vertx.setPeriodic(1000, id ->{
counter += 1;
LOGGER.info("invoked method {} ",counter);
vertx.executeBlocking(future -> {
int counterFinal = counter;
String result = service.blockingMethod("cycle "+counterFinal+" executed");
future.complete(result);
}, res -> {
LOGGER.info(String.format("The result is: %s", res.result()));
});
阻塞方法非常简单
public String blockingMethod(String result){
block(2);
return result;
}
这就是结果
07:50:27.742 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 1
07:50:28.742 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 2
07:50:29.740 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 3
07:50:29.764 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - The result is: cycle 1 executed
07:50:30.739 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 4
07:50:31.739 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 5
07:50:31.773 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - The result is: cycle 3 executed
07:50:32.751 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 6
07:50:33.748 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 7
07:50:33.789 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - The result is: cycle 5 executed
很明显平均漏掉了两个事件,因为延迟设置为2秒。 然后我将阻塞方法包装在 class 中,然后按以下方式执行
vertx.setPeriodic(1000, id ->{
counter++;
LOGGER.info("invoked method {} ",counter);
service.wrapperMethod("Hello", counter, new Handler<AsyncClass>() {
@Override
public void handle(AsyncClass event) {
vertx.executeBlocking(future -> {
String result = event.result();
future.complete(result);
}, res -> {
LOGGER.info(String.format("The result is: %s", res.result()));
});
}
});
});
wrapper Method就是这样设计的
public void wrapperMethod(String input, int cycle, Handler<AsyncClass> execute) {
AsyncClass instance = new AsyncClass(input,String.valueOf(cycle)); // my custom class where the result method has a 2 sec delay
execute.handle(instance);
}
然后我得到了预期的结果。
08:08:27.358 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 1
08:08:27.368 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:28.338 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 2
08:08:29.345 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 3
08:08:29.384 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:29.386 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - The result is: Hello world of cycle 1
08:08:30.347 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 4
08:08:31.351 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 5
08:08:31.391 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:31.391 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - The result is: Hello world of cycle 2
08:08:32.341 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 6
08:08:33.343 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 7
08:08:33.396 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:33.397 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - The result is: Hello world of cycle 3
现在我看到了异步执行,没有遗漏任何一个事件。我找不到可能的解释。 即使在包装器方法中,如果我延迟 n 秒,它也会按预期错过事件。
有人请帮助我理解这种行为。
更新1:
对于第二种情况,AsyncClass
的结构如下
public class AsyncClass {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncClass.class);
private String input;
private String cycle;
public AsyncClass(String input, String cycle) {
this.input = input;
this.cycle = cycle;
}
public String result(){
LOGGER.info("Invoking method inside AsyncClass class");
block(2);
return input+" world of cycle "+cycle;
}
private void block(int pauseLimitInSecond){
try {
TimeUnit.SECONDS.sleep(pauseLimitInSecond);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error("exception - > ", e);
}
}
}
您使用的 executeBlocking
method 确保阻塞任务按顺序执行(一个接一个)。
所以当第二个阻塞任务执行时(两秒后),counter
变量已经递增了两次。因此 1
、3
、5
系列。
在您使用包装 class 的另一次尝试中,counter
变量值在调用 executeBlocking
之前被捕获。所以当执行阻塞任务时,你得到了你期望的值。
观察到的行为是由于 Lambda body variable capture 捕获了外部 class 引用 (this
)。
外部 class 实例变量 counter
将在计算 Lambda 主体 表达式时更改其值(增加 1
超过初始值)这给人一种意外行为的错觉。
保持相同的程序顺序,您可以将 Lambda 表达式 主体替换为 Handler<Future<String>>
实现,其中 counter
外部实例变量存储在另一个将在处理程序执行体中使用的实例变量:
private static final class BlockingHandler implements Handler<Future<String>> {
private final YourBlockingService service;
private final int counter;
public BlockingHandler(int counter, YourBlockingService service) {
this.counter = counter;
this.service = service;
}
@Override
public void handle(Future<String> event) {
String result = service.blockingMethod("cycle " + this.counter + " executed", 2);
event.complete(result);
}
}
您的 Verticle 代码将如下所示:
this.vertx.setPeriodic(
1000,
id -> {
counter += 1;
LOGGER.info("invoked method {}", counter);
vertx.executeBlocking(
new YourBlockingHandler(this.counter, this.service),
res -> LOGGER.info(String.format("The result is: %s", res.result()))
);
}
);
恢复,上述行为仅与闭包语义相关,与Vert.x内部无关。