ListenableFuture中future.isDone()时Java如何保证回调被调用

How does Java guarantee callback is called when future.isDone() in ListenableFuture

当通过 ListenableFutre 界面时,它在文档中指出

addListener()

Registers a listener to be run on the given executor. The listener will run when the Future's computation is complete or, if the computation is already complete, immediately.`

由于 Future.get() 是一个阻塞调用,Java 如何保证某些未来已完成?他们在旋转这个吗?我知道使用像匕首生产者这样的框架,这很容易理解(一旦任务完成,写一些东西,监控线程就会收到通知)。在 ListenableFuture 的情况下,jvm 是否支持开箱即用的东西?

使用 wait()/notify() 之类的机制 ?


跟进问题:正如你们所说的,调用者实际上保证监听者是运行,正常情况下使用ListenableFuture 将是 ListenableFuture future = Caller.call(),调用者和被调用者在不同的线程甚至不同的 JVM 中,这是如何在 java 中完成的?监听器存储在调用者线程和被调用者线程中?或者在不同的 JVM 中使用远程注册表?

ListenableFuture 没有什么神奇之处 - 接口的契约只要求任何实现在完成时调用任何已注册的侦听器(或立即,如果已经完成)。

查看一个这样的实现可能会有所帮助,AbstractFuture - specifically look at the .complete() method,它在未来变为 "done"(通过完成、失败或被取消)后立即调用。为了既快速又线程安全,细节有些复杂,但基本上就是这样。

如前所述,了解 ListenableFuture 的最佳方式是查看其实现方式。当您调用 addListener(Runnable listener, Executor exec) 时,您提供了一个 Runnable 侦听器和一个 Executor 到 运行 这个侦听器,因此由您决定如何执行侦听器。

the listener is stored in both the caller thread and callee thread ?

监听器存储在future里面,在ExecutionList:

// The execution list to hold our executors.
private final ExecutionList executionList = new ExecutionList();

addListener(Runnable listener, Executor exec) 仅执行以下操作:

public void addListener(Runnable listener, Executor exec) {
    executionList.add(listener, exec);
}

所以当 future 完成时,它调用 set(V value) 方法:

protected boolean set(@Nullable V value) {
    boolean result = sync.set(value);
    if (result) {
      executionList.execute();
    }
    return result;
}

所有的监听器都是这样执行的:executor.execute(runnable);

我要补充答案。

番石榴不保证。 如果您关闭 JVM 或 JVM 崩溃,则不会调用任何侦听器。 如果您在不取消期货的情况下关闭执行器,则也不会调用任何侦听器。我是说这种情况:

ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setDaemon(false);
        return t;
    }
});
ListenableFuture<?> listenableFuture = JdkFutureAdapters.listenInPoolThread(
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("run!!!");
            }
        }),
        executorService
);
Futures.addCallback(listenableFuture, new FutureCallback<Object>() {

    @Override
    public void onSuccess(@Nullable Object result) {
        System.out.println("onSuccess");
    }

    @Override
    public void onFailure(Throwable t) {
        System.out.println("onFailure");
    }
});
MoreExecutors.shutdownAndAwaitTermination(executorService, 3, TimeUnit.SECONDS);

我没看到 "onSuccess" 或 "onFailure",是吗?

在通常的工作流程中,当 JVM 为 运行 时,Guava 使用 CAS 仅调用一次侦听器,您也可以在源代码中看到它。