如何在 CompletableFuture 中保留 slf4j MDC 日志上下文?
How to retain slf4j MDC logging context in CompletableFuture?
执行异步 CompletableFuture
时,父线程上下文以及 org.slf4j.MDC
上下文都会丢失。
这很糟糕,因为我正在使用某种 "fish tagging" 来跟踪来自多个日志文件中的一个请求的日志。
MDC.put("fishid", randomId())
问题:一般情况下,如何在 CompletableFutures
的任务中保留该 ID?
List<CompletableFuture<UpdateHotelAllotmentsRsp>> futures =
tasks.stream()
.map(task -> CompletableFuture.supplyAsync(
() -> businesslogic(task))
.collect(Collectors.toList());
List results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
public void businesslogic(Task task) {
LOGGER.info("mdc fishtag context is lost here");
}
最后,我创建了一个保留 MDC
的 Supplier
包装器。如果大家有更好的想法欢迎评论。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
return CompletableFuture.supplyAsync(new SupplierMDC(supplier), executor);
}
private static class SupplierMDC<T> implements Supplier<T> {
private final Supplier<T> delegate;
private final Map<String, String> mdc;
public SupplierMDC(Supplier<T> delegate) {
this.delegate = delegate;
this.mdc = MDC.getCopyOfContextMap();
}
@Override
public T get() {
MDC.setContextMap(mdc);
return delegate.get();
}
}
是的,Twitter Future 做到了这一点。他们有 class Local.scala Future.scala 知道的。
此修复程序供 java 作者修复此问题,以便您的本地状态遍历所有使用 CompletableFutures 的库。基本上,Local.scala 由 Future 使用并在内部使用 ThreadLocal 直到 .thenApply 或 .thenAccept 并且它将捕获状态并在需要时将其传输到下一个。这适用于所有第三方库,第 3 方库更改为零。
这里还有更多,但请戳 Java 作者修复他们的东西...
http://mail.openjdk.java.net/pipermail/core-libs-dev/2017-May/047867.html
到那时,MDC 将永远不会通过第 3 方库工作。
我的 SO post
Does CompletableFuture have a corresponding Local context?
我解决这个问题的最可读的方法如下 -
----------------线程实用程序class--------------------
public static Runnable withMdc(Runnable runnable) {
Map<String, String> mdc = MDC.getCopyOfContextMap();
return () -> {
MDC.setContextMap(mdc);
runnable.run();
};
}
public static <U> Supplier<U> withMdc(Supplier<U> supplier) {
Map<String, String> mdc = MDC.getCopyOfContextMap();
return (Supplier) () -> {
MDC.setContextMap(mdc);
return supplier.get();
};
}
----------------用法-------------
CompletableFuture.supplyAsync(withMdc(() -> someSupplier()))
.thenRunAsync(withMdc(() -> someRunnable())
....
ThreadUtils 中的 WithMdc 必须重载以包含 CompletableFuture 接受的其他功能接口
请注意,withMdc() 方法是静态导入的,以提高可读性。
我的解决方案主题是(它将与 JDK 9+ 一起使用,因为自该版本以来公开了几个可覆盖的方法)
Make the complete ecosystem aware of MDC
为此,我们需要解决以下情况:
- 我们什么时候才能从这个 class 中获得 CompletableFuture 的新实例? → 我们需要 return 一个 MDC 感知版本.
- 我们什么时候才能从外部获得 CompletableFuture 的新实例 class? → 我们需要 return 相同的 MDC 感知版本.
- 在 CompletableFuture 中使用哪个执行器 class? → 在所有情况下,我们需要确保所有执行器都是 MDC 感知的
为此,让我们通过扩展它来创建 CompletableFuture
的 MDC 感知版本 class。我的版本如下所示
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;
public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {
public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();
@Override
public CompletableFuture newIncompleteFuture() {
return new MDCAwareCompletableFuture();
}
@Override
public Executor defaultExecutor() {
return MDC_AWARE_ASYNC_POOL;
}
public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
return new MDCAwareCompletableFuture<>()
.completeAsync(() -> null)
.thenCombineAsync(future, (aVoid, value) -> value);
}
public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
Function<Throwable, T> throwableFunction) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return getMDCAwareCompletionStage(future)
.handle((value, throwable) -> {
setMDCContext(contextMap);
if (throwable != null) {
return throwableFunction.apply(throwable);
}
return value;
});
}
}
MDCAwareForkJoinPool
class 看起来像(为简单起见跳过了带有 ForkJoinTask
参数的方法)
public class MDCAwareForkJoinPool extends ForkJoinPool {
//Override constructors which you need
@Override
public <T> ForkJoinTask<T> submit(Callable<T> task) {
return super.submit(MDCUtility.wrapWithMdcContext(task));
}
@Override
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
return super.submit(wrapWithMdcContext(task), result);
}
@Override
public ForkJoinTask<?> submit(Runnable task) {
return super.submit(wrapWithMdcContext(task));
}
@Override
public void execute(Runnable task) {
super.execute(wrapWithMdcContext(task));
}
}
包装的实用方法如下
public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
//save the current MDC context
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
setMDCContext(contextMap);
try {
return task.call();
} finally {
// once the task is complete, clear MDC
MDC.clear();
}
};
}
public static Runnable wrapWithMdcContext(Runnable task) {
//save the current MDC context
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
setMDCContext(contextMap);
try {
return task.run();
} finally {
// once the task is complete, clear MDC
MDC.clear();
}
};
}
public static void setMDCContext(Map<String, String> contextMap) {
MDC.clear();
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
}
以下是一些使用指南:
- 使用 class
MDCAwareCompletableFuture
而不是 class CompletableFuture
。
- class
CompletableFuture
中的几个方法实例化了自我版本,例如 new CompletableFuture...
。对于此类方法(大多数 public 静态方法),请使用替代方法获取 MDCAwareCompletableFuture
的实例。使用替代方法的示例可以是 CompletableFuture.supplyAsync(...)
,您可以选择 new MDCAwareCompletableFuture<>().completeAsync(...)
- 使用方法
getMDCAwareCompletionStage
将 CompletableFuture
的实例转换为 MDCAwareCompletableFuture
当你因为说某个外部库 return CompletableFuture
的实例。显然,您无法在该库中保留上下文,但此方法在您的代码命中应用程序代码后仍会保留上下文。
- 在提供执行器作为参数时,确保它是 MDC 感知的,例如
MDCAwareForkJoinPool
。您也可以通过覆盖 execute
方法来创建 MDCAwareThreadPoolExecutor
来为您的用例服务。你明白了!
这样,您的代码将如下所示
List<CompletableFuture<UpdateHotelAllotmentsRsp>> futures =
tasks.stream()
new MDCAwareCompletableFuture<UpdateHotelAllotmentsRsp>().completeAsync(
() -> businesslogic(task))
.collect(Collectors.toList());
List results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
public UpdateHotelAllotmentsRsp businesslogic(Task task) {
LOGGER.info("mdc fishtag context is not lost here");
}
您可以在 post 中找到上述所有内容的 详细解释 。
执行异步 CompletableFuture
时,父线程上下文以及 org.slf4j.MDC
上下文都会丢失。
这很糟糕,因为我正在使用某种 "fish tagging" 来跟踪来自多个日志文件中的一个请求的日志。
MDC.put("fishid", randomId())
问题:一般情况下,如何在 CompletableFutures
的任务中保留该 ID?
List<CompletableFuture<UpdateHotelAllotmentsRsp>> futures =
tasks.stream()
.map(task -> CompletableFuture.supplyAsync(
() -> businesslogic(task))
.collect(Collectors.toList());
List results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
public void businesslogic(Task task) {
LOGGER.info("mdc fishtag context is lost here");
}
最后,我创建了一个保留 MDC
的 Supplier
包装器。如果大家有更好的想法欢迎评论。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
return CompletableFuture.supplyAsync(new SupplierMDC(supplier), executor);
}
private static class SupplierMDC<T> implements Supplier<T> {
private final Supplier<T> delegate;
private final Map<String, String> mdc;
public SupplierMDC(Supplier<T> delegate) {
this.delegate = delegate;
this.mdc = MDC.getCopyOfContextMap();
}
@Override
public T get() {
MDC.setContextMap(mdc);
return delegate.get();
}
}
是的,Twitter Future 做到了这一点。他们有 class Local.scala Future.scala 知道的。
此修复程序供 java 作者修复此问题,以便您的本地状态遍历所有使用 CompletableFutures 的库。基本上,Local.scala 由 Future 使用并在内部使用 ThreadLocal 直到 .thenApply 或 .thenAccept 并且它将捕获状态并在需要时将其传输到下一个。这适用于所有第三方库,第 3 方库更改为零。
这里还有更多,但请戳 Java 作者修复他们的东西... http://mail.openjdk.java.net/pipermail/core-libs-dev/2017-May/047867.html
到那时,MDC 将永远不会通过第 3 方库工作。
我的 SO post Does CompletableFuture have a corresponding Local context?
我解决这个问题的最可读的方法如下 -
----------------线程实用程序class--------------------
public static Runnable withMdc(Runnable runnable) {
Map<String, String> mdc = MDC.getCopyOfContextMap();
return () -> {
MDC.setContextMap(mdc);
runnable.run();
};
}
public static <U> Supplier<U> withMdc(Supplier<U> supplier) {
Map<String, String> mdc = MDC.getCopyOfContextMap();
return (Supplier) () -> {
MDC.setContextMap(mdc);
return supplier.get();
};
}
----------------用法-------------
CompletableFuture.supplyAsync(withMdc(() -> someSupplier()))
.thenRunAsync(withMdc(() -> someRunnable())
....
ThreadUtils 中的 WithMdc 必须重载以包含 CompletableFuture 接受的其他功能接口
请注意,withMdc() 方法是静态导入的,以提高可读性。
我的解决方案主题是(它将与 JDK 9+ 一起使用,因为自该版本以来公开了几个可覆盖的方法)
Make the complete ecosystem aware of MDC
为此,我们需要解决以下情况:
- 我们什么时候才能从这个 class 中获得 CompletableFuture 的新实例? → 我们需要 return 一个 MDC 感知版本.
- 我们什么时候才能从外部获得 CompletableFuture 的新实例 class? → 我们需要 return 相同的 MDC 感知版本.
- 在 CompletableFuture 中使用哪个执行器 class? → 在所有情况下,我们需要确保所有执行器都是 MDC 感知的
为此,让我们通过扩展它来创建 CompletableFuture
的 MDC 感知版本 class。我的版本如下所示
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;
public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {
public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();
@Override
public CompletableFuture newIncompleteFuture() {
return new MDCAwareCompletableFuture();
}
@Override
public Executor defaultExecutor() {
return MDC_AWARE_ASYNC_POOL;
}
public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
return new MDCAwareCompletableFuture<>()
.completeAsync(() -> null)
.thenCombineAsync(future, (aVoid, value) -> value);
}
public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
Function<Throwable, T> throwableFunction) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return getMDCAwareCompletionStage(future)
.handle((value, throwable) -> {
setMDCContext(contextMap);
if (throwable != null) {
return throwableFunction.apply(throwable);
}
return value;
});
}
}
MDCAwareForkJoinPool
class 看起来像(为简单起见跳过了带有 ForkJoinTask
参数的方法)
public class MDCAwareForkJoinPool extends ForkJoinPool {
//Override constructors which you need
@Override
public <T> ForkJoinTask<T> submit(Callable<T> task) {
return super.submit(MDCUtility.wrapWithMdcContext(task));
}
@Override
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
return super.submit(wrapWithMdcContext(task), result);
}
@Override
public ForkJoinTask<?> submit(Runnable task) {
return super.submit(wrapWithMdcContext(task));
}
@Override
public void execute(Runnable task) {
super.execute(wrapWithMdcContext(task));
}
}
包装的实用方法如下
public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
//save the current MDC context
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
setMDCContext(contextMap);
try {
return task.call();
} finally {
// once the task is complete, clear MDC
MDC.clear();
}
};
}
public static Runnable wrapWithMdcContext(Runnable task) {
//save the current MDC context
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
setMDCContext(contextMap);
try {
return task.run();
} finally {
// once the task is complete, clear MDC
MDC.clear();
}
};
}
public static void setMDCContext(Map<String, String> contextMap) {
MDC.clear();
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
}
以下是一些使用指南:
- 使用 class
MDCAwareCompletableFuture
而不是 classCompletableFuture
。 - class
CompletableFuture
中的几个方法实例化了自我版本,例如new CompletableFuture...
。对于此类方法(大多数 public 静态方法),请使用替代方法获取MDCAwareCompletableFuture
的实例。使用替代方法的示例可以是CompletableFuture.supplyAsync(...)
,您可以选择new MDCAwareCompletableFuture<>().completeAsync(...)
- 使用方法
getMDCAwareCompletionStage
将CompletableFuture
的实例转换为MDCAwareCompletableFuture
当你因为说某个外部库 returnCompletableFuture
的实例。显然,您无法在该库中保留上下文,但此方法在您的代码命中应用程序代码后仍会保留上下文。 - 在提供执行器作为参数时,确保它是 MDC 感知的,例如
MDCAwareForkJoinPool
。您也可以通过覆盖execute
方法来创建MDCAwareThreadPoolExecutor
来为您的用例服务。你明白了!
这样,您的代码将如下所示
List<CompletableFuture<UpdateHotelAllotmentsRsp>> futures =
tasks.stream()
new MDCAwareCompletableFuture<UpdateHotelAllotmentsRsp>().completeAsync(
() -> businesslogic(task))
.collect(Collectors.toList());
List results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
public UpdateHotelAllotmentsRsp businesslogic(Task task) {
LOGGER.info("mdc fishtag context is not lost here");
}
您可以在 post 中找到上述所有内容的 详细解释 。