如何拦截主线程下的Runnable创建和执行以使用AspectJ填充上下文流数据

How to intercept Runnable creation and execution under master thread to populate context flow data using AspectJ

原刊

将 Java MDC 从一个线程填充到所有它生成的内部线程(父子关系)

使用 AspectJ 的 WIP 解决方案

我能够编写一个方面来拦截所有 Runnable 创建,但是因为我希望每次使用都有一个不同的方面实例(使用自定义注释),因为我必须在执行代码时将 MDC 存储在某个地方来自父线程,我无法编写切入点拦截新创建的 Runnable 实例,因此我可以使用先前的上下文映射设置 MDC。

这是看点

@Aspect("percflow(@annotation(com.bell.cts.commons.cron.framework.scheduler.domain.MDCTrace))")
public class MDCTraceAspect {

  private final Logger logger = LoggerFactory.getLogger(MDCTraceAspect.class);
  private int i;
  private final Map<String, String> contextMap;

  public MDCTraceAspect() {
    i = new Random().nextInt();
    MDC.clear();
    MDC.put("IP", String.valueOf(i));
    contextMap = MDC.getCopyOfContextMap();
    logger.debug(String.format("[%d] New Aspect", Thread.currentThread().getId()));
  }

  @Before("execution(Runnable+.new(..))")
  public void beforeNewRunnable(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.debug(String.format("[%d] New Runnable", Thread.currentThread().getId()));
  }

  @Before("execution(* Runnable+.*(..))")
  public void before(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.info(String.format("[%d] RUNNABLE WORKS!", Thread.currentThread().getId()));
  }

  @Before("execution(void Child.run())")
  public void beforeChildRun(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.info(String.format("[%d] CHILD WORKS!", Thread.currentThread().getId()));
  }
}

这里是 ParentChild 和自定义注释

public class Parent {

  private final Logger logger = LoggerFactory.getLogger(Parent.class);
  private ExecutorService executorService;

  @MDCTrace
  public void runMultiThreadByExecutor() throws InterruptedException {
    executorService = Executors.newCachedThreadPool();
    logger.info(String.format("[%d] Before start child thread", Thread.currentThread().getId()));

    executorService.submit(new Child());
    logger.info(String.format("[%d] After start child thread", Thread.currentThread().getId()));

    List.of(10, 11, 12, 13, 14).parallelStream().forEach(i -> {
      logger.info(String.format("[%d] Loop iteration #%d", Thread.currentThread().getId(), i));
    });

    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.SECONDS);
    logger.info(String.format("[%d] ExecutorService is over", Thread.currentThread().getId()));
  }

  public static void main(String[] args) throws InterruptedException {
    Parent parent = new Parent();
    parent.runMultiThreadByExecutor();
  }
}
public class Child implements Runnable {

  private final Logger logger = LoggerFactory.getLogger(Child.class);

  @Override
  public void run() {
    logger.info(String.format("[%d] Running in the child thread", Thread.currentThread().getId()));
  }
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MDCTrace {
}

目标

最终目标是简单地注释 MDC 上下文的入口点,以便在执行注释方法(甚至其他对象)时创建的任何 threads/runnables/futures 被拦截,因此使用 [=52] 正确设置 MDC =] 线程 MDC 信息存储在当前上下文流的方面实例中。

暂定 beforebeforeChildRun 都不起作用,我找不到如何使其中之一起作用。

谢谢

奖励积分 如果有人可以指导我如何使它也适用于 parallelStream

首先你需要了解一个新线程不在其父线程的控制流之内。请参阅我的其他答案以获取解释,包括。示例代码和控制台日志:

因此,正如您已经注意到的那样,与 cflow() 或方面实例化 percflow() 相关的任何内容在这种情况下都不起作用。

获得您需要的一部分的唯一方法 - 如果您使用编译时编织,至少对于您自己的 classes 以及第三方 JARs/classes(除了 JRE classes) 如果你使用加载时编织 - 是手动簿记。

看看这个例子,我稍微修改了您自己的代码以显示变通方法及其限制。我还想避免使用任何日志记录框架,而是打印到 System.out。因此,我不得不用虚拟 class 替换 MDC 以使代码编译。

package de.scrum_master.app;

import java.util.HashMap;
import java.util.Map;

public class MDC {
  private static ThreadLocal<Map<String, String>> contextMap = new InheritableThreadLocal<>();

  static {
    clear();
  }

  public static void clear() {
    contextMap.set(new HashMap<>());
  }

  public static void put(String key, String value) {
    contextMap.get().put(key, value);
  }

  public static Map<String, String> getCopyOfContextMap() {
    return new HashMap<>(contextMap.get());
  }

  public static void setContextMap(Map<String, String> contextMap) {
    MDC.contextMap.set(contextMap);
  }
}
package de.scrum_master.app;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MDCTrace {}
package de.scrum_master.app;

public class Child implements Runnable {
  @Override
  public void run() {
    System.out.println(String.format("[%d] Running in the child thread", Thread.currentThread().getId()));
  }
}
package de.scrum_master.app;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Parent {
  private ExecutorService executorService;

  @MDCTrace
  public Runnable runMultiThreadByExecutorAnnotated(Runnable runnable) throws InterruptedException {
    return doStuff(runnable);
  }

  @MDCTrace
  public Runnable runMultiThreadByExecutorAnnotated() throws InterruptedException {
    return doStuff();
  }

  public Runnable runMultiThreadByExecutorPlain() throws InterruptedException {
    return doStuff();
  }

  public Runnable runMultiThreadByExecutorPlain(Runnable runnable) throws InterruptedException {
    return doStuff(runnable);
  }

  private Runnable doStuff() throws InterruptedException {
    return doStuff(new Child());
  }

  private Runnable doStuff(Runnable runnable) throws InterruptedException {
    executorService = Executors.newCachedThreadPool();
    System.out.println(String.format("[%d] Before start child thread", Thread.currentThread().getId()));

    executorService.submit(runnable);
    System.out.println(String.format("[%d] After start child thread", Thread.currentThread().getId()));

    List.of(10, 11, 12, 13, 14).parallelStream().forEach(i -> {
      //System.out.println(String.format("[%d] Loop iteration #%d", Thread.currentThread().getId(), i));
    });

    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.SECONDS);
    System.out.println(String.format("[%d] ExecutorService is over", Thread.currentThread().getId()));
    System.out.println("\n----------------------------------------\n");
    return runnable;
  }

  public static void main(String[] args) throws InterruptedException {
    Parent parent = new Parent();
    System.out.println("MDCTrace annotation");
    parent.runMultiThreadByExecutorAnnotated();
    System.out.println("No annotation");
    parent.runMultiThreadByExecutorPlain();

    Runnable runnable = new Child();
    System.out.println("MDCTrace annotation (runnable created outside of control flow)");
    parent.runMultiThreadByExecutorAnnotated(runnable);
    System.out.println("No annotation (re-use runnable created outside of control flow)");
    parent.runMultiThreadByExecutorPlain(runnable);

    System.out.println("MDCTrace annotation (save returned runnable)");
    runnable = parent.runMultiThreadByExecutorAnnotated();
    System.out.println("No annotation (re-use returned runnable)");
    parent.runMultiThreadByExecutorPlain(runnable);
}
}

如您所见,我有一个正面和一个负面的测试示例(有和没有 @MDCTrace 注释)以及每个示例的三个案例:

  1. 像您在自己的示例中所做的那样,在带注释(或未带注释)方法的控制流中创建可运行对象。
  2. 在注释(或非注释)方法的控制流之外创建可运行对象,通过引用将它们传递到控制流中。
  3. 在注释方法的控制流中创建第一个可运行对象,返回它并将其传递到非注释方法的控制流中。

编号 2 和编号 3 用于说明后续方面方法的局限性,主要包括对在注释方法的控制流中创建的所有 Runnable 实例进行手动簿记。

package de.scrum_master.aspect;

import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;

import de.scrum_master.app.MDC;

@Aspect
public class MDCTraceAspect {
  private static final Random RANDOM = new Random(); 
  private Map<String, String> contextMap;
  private Set<Runnable> runnables = new HashSet<>();

  @Pointcut("@annotation(de.scrum_master.app.MDCTrace) && execution(* *(..))")
  private static void entryPoint() {}

  @Before("entryPoint()")
  public void executeEntryPoint() {
    MDC.clear();
    MDC.put("IP", String.valueOf(RANDOM.nextInt()));
    contextMap = MDC.getCopyOfContextMap();
    System.out.println(String.format("[%d] * Entry point", Thread.currentThread().getId()));
  }

  @Before("execution(Runnable+.new(..)) && cflow(entryPoint()) && target(runnable)")
  public void beforeNewRunnable(JoinPoint joinPoint, Runnable runnable) {
    runnables.add(runnable);
    MDC.setContextMap(contextMap);
    System.out.println(String.format("[%d] * New Runnable", Thread.currentThread().getId()));
  }

  @Before("execution(public void Runnable+.run(..)) && target(runnable)")
  public void beforeRunnableExecution(JoinPoint joinPoint, Runnable runnable) {
    if (!runnables.contains(runnable))
      return;
    MDC.setContextMap(contextMap);
    System.out.println(String.format("[%d] * Runnable started", Thread.currentThread().getId()));
  }
}

这会产生以下控制台日志(分为 3 个部分):


  1. 在注释(或非注释)方法的控制流内创建可运行对象,就像您在自己的示例中所做的那样:
MDCTrace annotation
[1] * Entry point
[1] * New Runnable
[1] Before start child thread
[1] After start child thread
[12] * Runnable started
[12] Running in the child thread
[1] ExecutorService is over

----------------------------------------

No annotation
[1] Before start child thread
[1] After start child thread
[13] Running in the child thread
[1] ExecutorService is over

----------------------------------------

这如您所料。这里没有惊喜。


  1. 在注释(或非注释)方法的控制流之外创建可运行对象,通过引用将它们传递到控制流中:
MDCTrace annotation (runnable created outside of control flow)
[1] * Entry point
[1] Before start child thread
[1] After start child thread
[14] Running in the child thread
[1] ExecutorService is over

----------------------------------------

No annotation (re-use runnable created outside of control flow)
[1] Before start child thread
[1] After start child thread
[15] Running in the child thread
[1] ExecutorService is over

----------------------------------------

如您所见,到达入口点后,此处没有日志输出。这可能不是您想要的,但可运行对象已在控制流之外创建并传入,因此不会在此处触发方面。


  1. 在注释方法的控制流中创建第一个 runnable,返回它并将其传递到非注释方法的控制流中:
MDCTrace annotation (save returned runnable)
[1] * Entry point
[1] * New Runnable
[1] Before start child thread
[1] After start child thread
[16] * Runnable started
[16] Running in the child thread
[1] ExecutorService is over

----------------------------------------

No annotation (re-use returned runnable)
[1] Before start child thread
[1] After start child thread
[17] * Runnable started
[17] Running in the child thread
[1] ExecutorService is over

----------------------------------------

这里的 A 部分就像第一个案例。 1,但是 B 部分还打印了非注释方法的日志行,因为 Runnable 实例在注释方法的控制流期间已在方面的簿记中注册。所以在这里您会看到一条您可能更想避免的日志行。

那么这里的结论是什么?没有完美的解决方案,您需要检查您的代码以及那里有哪些情况,然后设计方面以适应这些情况。如果你没有像我编造的那样的案例。 2 和 3,我的方法有效。

其他一些注意事项:

  • 注意 RunnableThread 的区别。它们不一样,你可以在多个线程中重复使用相同的runnable。此外,您也可以重复使用线程,例如通过使用线程池。所以这可以变得任意复杂。每个标记为您的方面的目标的可运行或线程稍后可以在您不想记录的上下文中重新使用。
  • 对于并行流或其他由 JRE 本身创建可运行对象的情况,这永远不会起作用,因为由内部 JRE classes 创建的可运行对象和线程不受方面编织的影响,在编译时也不在加载时编织的情况下。理论上,您可以将方面代码编织到 JRE 或 JDK 中,从编织的 classes 创建新的 JAR 并替换原始文件或将它们添加到引导 class 路径。但这有点复杂,您确实需要控制应用程序的执行环境,以便使用正确的参数启动 JVM。我之前这样做过并且有效,但这不适合初学者,开始时可能是个坏主意。