Java AWS SDK 2.x 异步客户端 - 无法覆盖线程池执行器

Java AWS SDK 2.x Async Client - can’t override thread pool executor

我想为 AWS SDK 2.x 的异步客户端(例如 SnsAsyncClient)设置自定义线程池执行器。我看到对于 AWS SDK 1.x,客户端生成器上存在 withExecutorFactory,对于 2.x,我们需要在客户端上设置 ClientAsyncConfiguration 并调用 asyncConfiguration 方法建造者:

ClientAsyncConfiguration.Builder asyncConfig = ClientAsyncConfiguration.builder()
        .advancedOptions(Map.of(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
                myCustomExecutor));
SnsAsyncClient.builder()
        .asyncConfiguration(asyncConfig.build())
        .build();

仍然,自定义线程池执行器不适用。我希望 CompletableFuture 上的所有异步调用和后续调用都将在自定义执行器上执行,但事实并非如此。 在调试期间,我看到逻辑在不同的线程池上执行:aws-java-sdk-NettyEventLoop-0-XForkJoinPool.commonPool-worker-X。 AWS SNS 异步调用示例:

snsAsyncClient.publish(publishRequest)
        .thenApplyAsync(..)
        .whenCompleteAsync(..);

如何设置自定义线程池,让异步非阻塞IO操作在特定的线程池上执行?

AWS SDK 版本:software.amazon.awssdk sdk-core 2.7.3

异步配置SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR的思路是提供线程池执行器,用于CompletableFuture的后续调用,如thenApply(..)whenComplete(..)。由于 CompletableFuture 实现,此执行器将不会应用于 thenApplyAsyncwhenCompleteAsync 等方法(如果执行器未传递到 thenApplyAsync,将默认使用 ForkJoinPool.commonPool(),或者我们可以将自定义执行程序作为第二个方法参数传递)。

snsAsyncClient.publish(publishRequest)
        .thenApply(..)
        .whenComplete(..);

thenApplywhenComplete 中的代码将在来自 FUTURE_COMPLETION_EXECUTOR 的已配置执行程序上处理。

为了设置异步客户端配置并提高性能,我们应该创建 NettyNioAsyncHttpClient.Builder 的实例并使用方法 httpClientBuilder

将其传递到异步客户端
var asyncHttpClientBuilder = NettyNioAsyncHttpClient.builder()
        .maxConcurrency(100)
        .connectionTimeout(Duration.ofSeconds(20))
        .connectionAcquisitionTimeout(Duration.ofSeconds(20));
SnsAsyncClient.builder()
        .httpClientBuilder(asyncHttpClientBuilder)
        .build();