CompletableFuture / ForkJoinPool 设置 Class 装载机

CompletableFuture / ForkJoinPool Set Class Loader

我解决了一个非常具体的问题,其解决方案似乎是基本的:

我的 (Spring) 应用程序的 classloader 层次结构是这样的:SystemClassLoader -> PlatformClassLoader -> AppClassLoader

如果我使用 Java CompleteableFuture 到 运行 线程。线程的 ContextClassLoader 是:SystemClassLoader -> PlatformClassLoader -> ThreadClassLoader

因此,我无法访问 AppClassLoader 中的任何 class,尽管我必须这样做,因为所有外部库 class 都驻留在那里。

源代码库非常大,所以我不想 to/can 将所有与线程相关的部分重写为其他内容(例如,将自定义执行程序传递给每个调用)。

所以我的问题是:我怎样才能创建由例如创建的线程? CompleteableFuture.supplyAsync() 使用 AppClassLoader 作为 parent?(而不是 PlatformClassloader

我发现 ForkJoinPool is used to create the threads. But as it seems to me, everything there is static and final. So I doubt that even setting a custom ForkJoinWorkerThreadFactory 和系统 属性 在这种情况下会有所帮助。还是会?

编辑以回答评论中的问题:

完整堆栈跟踪:

java.lang.IllegalArgumentException: org.keycloak.admin.client.resource.RealmsResource referenced from a method is not visible from class loader
    at java.base/java.lang.reflect.Proxy$ProxyBuilder.ensureVisible(Proxy.java:851) ~[na:na]
    at java.base/java.lang.reflect.Proxy$ProxyBuilder.validateProxyInterfaces(Proxy.java:682) ~[na:na]
    at java.base/java.lang.reflect.Proxy$ProxyBuilder.<init>(Proxy.java:628) ~[na:na]
    at java.base/java.lang.reflect.Proxy.lambda$getProxyConstructor(Proxy.java:426) ~[na:na]
    at java.base/jdk.internal.loader.AbstractClassLoaderValue$Memoizer.get(AbstractClassLoaderValue.java:327) ~[na:na]
    at java.base/jdk.internal.loader.AbstractClassLoaderValue.computeIfAbsent(AbstractClassLoaderValue.java:203) ~[na:na]
    at java.base/java.lang.reflect.Proxy.getProxyConstructor(Proxy.java:424) ~[na:na]
    at java.base/java.lang.reflect.Proxy.newProxyInstance(Proxy.java:999) ~[na:na]
    at org.jboss.resteasy.client.jaxrs.ProxyBuilder.proxy(ProxyBuilder.java:79) ~[resteasy-client-3.1.4.Final.jar!/:3.1.4.Final]
    at org.jboss.resteasy.client.jaxrs.ProxyBuilder.build(ProxyBuilder.java:131) ~[resteasy-client-3.1.4.Final.jar!/:3.1.4.Final]
    at org.jboss.resteasy.client.jaxrs.internal.ClientWebTarget.proxy(ClientWebTarget.java:93) ~[resteasy-client-3.1.4.Final.jar!/:3.1.4.Final]
    at org.keycloak.admin.client.Keycloak.realms(Keycloak.java:114) ~[keycloak-admin-client-3.4.3.Final.jar!/:3.4.3.Final]
    at org.keycloak.admin.client.Keycloak.realm(Keycloak.java:118) ~[keycloak-admin-client-3.4.3.Final.jar!/:3.4.3.Final]

似乎resteasy lib使用线程上下文classloader来加载一些资源: http://grepcode.com/file/repo1.maven.org/maven2/org.jboss.resteasy/resteasy-client/3.0-beta-1/org/jboss/resteasy/client/jaxrs/ProxyBuilder.java#21.

当 resteasy 尝试加载请求的 class 时,它会要求线程 classloader 查找并加载它,如果可能的话,当请求的 class 位于class加载程序不可见的路径,操作失败。

这正是您的应用程序发生的情况: ThreadClassLoader 试图加载位于应用程序 class 路径中的资源,因为此 class 路径中的资源只能从 AppClassLoader[=25 访问=] 及其子项,那么 ThreadClassLoader 无法加载它(ThreadClassLoader 不是 AppClassLoader[=25 的子项=]).

一个可能的解决方案是通过您的应用程序类加载器覆盖线程上下文类加载器: thread.setContextClassLoader(appClass.class.getClassLoader())

所以,这是一个非常肮脏的解决方案,我对此并不引以为豪,可能会破坏你的东西如果你接受它:

问题是 ForkJoinPool.commonPool() 没有使用应用程序的类加载器。因为 commonPool 的设置是静态的,因此在应用程序启动期间,以后很难(至少据我所知)进行更改。所以需要依赖Java反射API.

  1. 在您的应用程序成功启动后创建一个挂钩

    • 在我的例子中(Spring 引导环境)这将是 ApplicationReadyEvent
    • 要收听此事件,您需要如下组件

      @Component
      class ForkJoinCommonPoolFix : ApplicationListener<ApplicationReadyEvent> {
          override fun onApplicationEvent(event: ApplicationReadyEvent?) {
        }
      }
      
  2. 在你的钩子中,你需要将 commonPool 的 ForkJoinWorkerThreadFactory 设置为自定义实现(因此这个自定义实现将使用应用程序类加载器)

    • 在 Kotlin 中

      val javaClass = ForkJoinPool.commonPool()::class.java
      val field = javaClass.getDeclaredField("factory")
      field.isAccessible = true
      val modifiers = field::class.java.getDeclaredField("modifiers")
      modifiers.isAccessible = true
      modifiers.setInt(field, field.modifiers and Modifier.FINAL.inv())
      field.set(ForkJoinPool.commonPool(), CustomForkJoinWorkerThreadFactory())
      field.isAccessible = false
      
  3. CustomForkJoinWorkerThreadFactory

    的简单实现
    • 在 Kotlin 中

      //Custom class
      class CustomForkJoinWorkerThreadFactory : ForkJoinPool.ForkJoinWorkerThreadFactory {
        override fun newThread(pool: ForkJoinPool?): ForkJoinWorkerThread {
          return CustomForkJoinWorkerThread(pool)
        }
      }
      // helper class (probably only needed in kotlin)
      class CustomForkJoinWorkerThread(pool: ForkJoinPool?) : ForkJoinWorkerThread(pool)
      

如果您需要有关反射的更多信息 以及更改最终字段的原因 please refer to here and here。简短摘要:由于优化,更新的最终字段可能对其他对象不可见,并且可能会出现其他未知的副作用。

如前所述:这是一个非常肮脏的解决方案。如果您使用此解决方案,可能会出现不需要的副作用。使用这样的反射不是一个好主意。如果您可以使用没有反射的解决方案(并且post它作为答案!)。

编辑:单个调用的替代方法

如问题本身所述:如果您只在少数地方遇到此问题(即修复调用本身没有问题),您可以使用自己的 Executor. A simple example copied from here:

ExecutorService pool = Executors.newFixedThreadPool(10);
final CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> { /* ... */ }, pool);

我 运行 研究了类似的东西,并提出了一个不使用反射的解决方案,并且似乎可以很好地与 JDK9-JDK11 配合使用。

javadocs 是这样说的:

The parameters used to construct the common pool may be controlled by setting the following system properties:

  • java.util.concurrent.ForkJoinPool.common.threadFactory - the class name of a ForkJoinPool.ForkJoinWorkerThreadFactory. The system class loader is used to load this class.

因此,如果您推出自己的 ForkJoinWorkerThreadFactory 版本并将其设置为使用系统 属性 使用正确的 ClassLoader,这应该可行。

这是我的习惯 ForkJoinWorkerThreadFactory:

package foo;

public class MyForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {

    @Override
    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return new MyForkJoinWorkerThread(pool);
    }

    private static class MyForkJoinWorkerThread extends ForkJoinWorkerThread {

        private MyForkJoinWorkerThread(final ForkJoinPool pool) {
            super(pool);
            // set the correct classloader here
            setContextClassLoader(Thread.currentThread().getContextClassLoader());
        }
    }
} 

然后在您的应用程序启动脚本中设置系统属性

-Djava.util.concurrent.ForkJoinPool.common.threadFactory=foo.MyForkJoinWorkerThreadFactory

上述解决方案假设当第一次引用 ForkJoinPool class 并初始化 commonPool 时,此线程的上下文类加载器是您需要的正确类加载器(并且是不是系统 class 加载器)。

这里有一些 background 可能有帮助:

Fork/Join common pool threads return the system class loader as their thread context class loader.

In Java SE 9, threads that are part of the fork/join common pool will always return the system class loader as their thread context class loader. In previous releases, the thread context class loader may have been inherited from whatever thread causes the creation of the fork/join common pool thread, e.g. by submitting a task. An application cannot reliably depend on when, or how, threads are created by the fork/join common pool, and as such cannot reliably depend on a custom defined class loader to be set as the thread context class loader.

由于上述向后不兼容的变化,使用曾经在 JDK8 中工作的 ForkJoinPool 的东西在 JDK9+ 中可能无法工作。

在 jdk11 中有效的一个可能解决方案(使用 Spring Boot 2.2 测试)是利用 ForkJoinPool

中的新构造函数

主要想法是使用自定义 ThreadFactory 创建一个自定义 ForkJoinPool,该 ThreadFactory 使用我们自己的 ClassLoader(不是系统类加载器 - 此行为始于 jdk9-)

一段历史
在 jdk9 ForkJoinPool.common() return 之前,return 是一个带有主线程类加载器的执行器,在 Java 9 this behave changes 中,return 是一个带有系统 jdk 的执行器] 系统类加载器。 因此,由于此更改,从 Java 8 升级到 Java 9 / 10 / 11 时,很容易在 CompletableFutures 代码中找到 ClassNotFoundExceptions。

解决方法 创建我们自己的工厂 并使用这个工厂创建一个 ForkJoinPool 和一个 Executor

MyForkJoinWorkerThreadFactory factory = new MyForkJoinWorkerThreadFactory();

ForkJoinPool myCommonPool = new ForkJoinPool(Math.min(32767, Runtime.getRuntime().availableProcessors()), factory, null, false);

这样使用

CompletableFuture.runAsync(() -> {
   log.info(Thread.currentThread().getName()+" "+Thread.currentThread().getContextClassLoader().toString());  
   // will print the classloader from the Main Thread, not the jdk system one :)
}, myCommonPool).join();

外球
如果您支持基于 Spring 的应用程序,应该有必要将您的 Spring 安全上下文添加到新的自定义线程池

@Bean(name = "customExecutor")
public Executor customExecutor() {
    MyForkJoinWorkerThreadFactory factory = new MyForkJoinWorkerThreadFactory();
    ForkJoinPool myCommonPool = new ForkJoinPool(Math.min(32767, Runtime.getRuntime().availableProcessors()), factory, null, false);

    DelegatingSecurityContextExecutor delegatingExecutorCustom = new DelegatingSecurityContextExecutor(myCommonPool, SecurityContextHolder.getContext());
    return delegatingExecutorCustom;
}

并像使用任何其他资源一样使用自动装配

@Autowired private Executor customExecutor;

CompletableFuture.runAsync(() -> {
    ....
}, customExecutor).join();