将threadpoolexecutor传递给CompletableFuture有哪些方式?

What are the ways to pass threadpoolexecutor to CompletableFuture?

我最近一直在研究 Java CompletableFuture 并发现,我们应该始终使用自定义的线程池。有了它,我找到了两种将线程池传递给现有代码的方法。喜欢下面

这是我在配置文件中的线程池

@Override
@Bean(name = "commonThreadPool")
public Executor getAsyncExecutor() {
  return new ThreadPoolTaskExecutor();
}

1.在参数中传递 existingThreadPool。

 @Autowired
 @Qualifier("commonThreadPool") 
 TaskExecutor existingThreadPool;       
 CompletableFuture.runAsync(() -> executeTask(),existingThreadPool);

2。像下面这样使用异步

@Async("commonThreadPool")
public void executeTask() {
// Execute Some Task
}

是否有任何第三种方法可以让我编写 CompletableFuture 处理程序或在我可以传递自定义线程池的单个位置覆盖其现有行为。之后无论我在哪里使用下面的代码,它都应该选择我现有的 ThreadPool 而不是 forkJoin 池。

 CompletableFuture.runAsync(() -> executeTask());

我强烈建议不要这样做,但如果你真的想要,你可以使用反射来更改可完成的未来使用的线程池。

public static void main(String[] args) throws Exception {
    // Prints ForkJoinPool.commonPool-worker-1
    CompletableFuture<Void> c = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()));
    c.get();

    setFinalStatic(CompletableFuture.class.getDeclaredField("asyncPool"), Executors.newFixedThreadPool(10));

    // Prints pool-1-thread-1
    c = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()));
    c.get();
}

static void setFinalStatic(Field field, Object newValue) throws Exception {
    field.setAccessible(true);
    Field modifiersField = Field.class.getDeclaredField("modifiers");
    modifiersField.setAccessible(true);
    modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
    field.set(null, newValue);
}

setFinalStatic 取自

没有标准方法可以替换所有 CompletableFuture 实例的默认执行程序。但是从 Java 9 开始,您可以为子 classes 定义一个默认执行器。例如。与

public class MyCompletableFuture<T> extends CompletableFuture<T> {
    static final Executor EXEC = r -> {
        System.out.println("executing "+r);
        new Thread(r).start();
    };

    @Override
    public Executor defaultExecutor() {
        return EXEC;
    }

    @Override
    public <U> CompletableFuture<U> newIncompleteFuture() {
        return new MyCompletableFuture<>();
    }

    public static CompletableFuture<Void> runAsync​(Runnable runnable) {
        Objects.requireNonNull(runnable);
        return supplyAsync(() -> {
            runnable.run();
            return null;
        });
    }

    public static <U> CompletableFuture<U> supplyAsync​(Supplier<U> supplier) {
        return new MyCompletableFuture<U>().completeAsync(supplier);
    }
}

您已完成为 MyCompletableFuture 的所有链接阶段定义默认执行程序的所有必要步骤。 EXEC 中的 executor hold 仅用作示例,使用时会生成打印输出,因此当您使用该示例时 class like

MyCompletableFuture.supplyAsync(() -> "test")
    .thenApplyAsync(String::toUpperCase)
    .thenAcceptAsync(System.out::println);

它将打印

executing java.util.concurrent.CompletableFuture$AsyncSupply@65ab7765
executing java.util.concurrent.CompletableFuture$UniApply@119d7047
executing java.util.concurrent.CompletableFuture$UniAccept@404b9385
TEST