vertx executeBlocking 不同的行为

vertx executeBlocking different behaviour

我正在试验 vertx executeBlocking,以模拟我遵循的实时场景

vertx.setPeriodic(1000, id ->{
    counter += 1;
    LOGGER.info("invoked method {} ",counter);
    vertx.executeBlocking(future -> {
        int counterFinal = counter;
        String result = service.blockingMethod("cycle "+counterFinal+" executed");
        future.complete(result);
    }, res -> {
        LOGGER.info(String.format("The result is: %s", res.result()));
    });

阻塞方法非常简单

public String blockingMethod(String result){
    block(2);
    return result;
}

这就是结果

07:50:27.742 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 1 
07:50:28.742 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 2 
07:50:29.740 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 3 
07:50:29.764 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - The result is: cycle 1 executed
07:50:30.739 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 4 
07:50:31.739 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 5 
07:50:31.773 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - The result is: cycle 3 executed
07:50:32.751 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 6 
07:50:33.748 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - invoked method 7 
07:50:33.789 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.AsyncExperimentalVerticle - The result is: cycle 5 executed

很明显平均漏掉了两个事件,因为延迟设置为2秒。 然后我将阻塞方法包装在 class 中,然后按以下方式执行

vertx.setPeriodic(1000, id ->{
    counter++;
    LOGGER.info("invoked method {} ",counter);
    service.wrapperMethod("Hello", counter, new Handler<AsyncClass>() {
        @Override
        public void handle(AsyncClass event) {
            vertx.executeBlocking(future -> {
                String result = event.result();
                future.complete(result);
            }, res -> {
                LOGGER.info(String.format("The result is: %s", res.result()));
            });
        }
    });
});

wrapper Method就是这样设计的

public void wrapperMethod(String input, int cycle, Handler<AsyncClass> execute) {
    AsyncClass instance = new AsyncClass(input,String.valueOf(cycle)); // my custom class where the result method has a 2 sec delay
    execute.handle(instance);
}

然后我得到了预期的结果。

08:08:27.358 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 1 
08:08:27.368 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:28.338 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 2 
08:08:29.345 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 3 
08:08:29.384 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:29.386 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - The result is: Hello world of cycle 1
08:08:30.347 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 4 
08:08:31.351 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 5 
08:08:31.391 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:31.391 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - The result is: Hello world of cycle 2
08:08:32.341 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 6 
08:08:33.343 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - invoked method 7 
08:08:33.396 [vert.x-worker-thread-0] INFO lab.async.base.support.AsyncClass - Invoking method inside AsyncClass class
08:08:33.397 [vert.x-eventloop-thread-0] INFO lab.async.base.verticle.TestVerticle2 - The result is: Hello world of cycle 3

现在我看到了异步执行,没有遗漏任何一个事件。我找不到可能的解释。 即使在包装器方法中,如果我延迟 n 秒,它也会按预期错过事件。

有人请帮助我理解这种行为。

更新1:

对于第二种情况,AsyncClass 的结构如下

public class AsyncClass {

    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncClass.class);

    private String input;
    private String cycle;

    public AsyncClass(String input, String cycle) {
        this.input = input;
        this.cycle = cycle;
    }

    public String result(){
        LOGGER.info("Invoking method inside AsyncClass class");
        block(2);
        return input+" world of cycle "+cycle;
    }

    private void block(int pauseLimitInSecond){
        try {
            TimeUnit.SECONDS.sleep(pauseLimitInSecond);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.error("exception - > ", e);
        }
    }
}

您使用的 executeBlocking method 确保阻塞任务按顺序执行(一个接一个)。

所以当第二个阻塞任务执行时(两秒后),counter变量已经递增了两次。因此 135 系列。

在您使用包装 class 的另一次尝试中,counter 变量值在调用 executeBlocking 之前被捕获。所以当执行阻塞任务时,你得到了你期望的值。

观察到的行为是由于 Lambda body variable capture 捕获了外部 class 引用 (this)。

外部 class 实例变量 counter 将在计算 Lambda 主体 表达式时更改其值(增加 1超过初始值)这给人一种意外行为的错觉。

保持相同的程序顺序,您可以将 Lambda 表达式 主体替换为 Handler<Future<String>> 实现,其中 counter 外部实例变量存储在另一个将在处理程序执行体中使用的实例变量:

private static final class BlockingHandler implements Handler<Future<String>> {

    private final YourBlockingService service;

    private final int counter;

    public BlockingHandler(int counter, YourBlockingService service) {
        this.counter = counter;
        this.service = service;
    }

    @Override
    public void handle(Future<String> event) {
        String result = service.blockingMethod("cycle " + this.counter + " executed", 2);
        event.complete(result);
    }
}

您的 Verticle 代码将如下所示:

this.vertx.setPeriodic(
      1000,
      id -> {
          counter += 1;
          LOGGER.info("invoked method {}", counter);
          vertx.executeBlocking(
                new YourBlockingHandler(this.counter, this.service),
                res -> LOGGER.info(String.format("The result is: %s", res.result()))
          );
      }
);

恢复,上述行为仅与闭包语义相关,与Vert.x内部无关。