Spring 启动自定义@Async Wrap Callable
Spring Boot Custom @Async Wrap Callable
我正在开发一个支持多租户的应用程序。租户的唯一标识符存储在本地线程中,可以通过某些服务访问。
为了允许并行处理,我创建了一个 Callable 包装器,设置线程局部变量:
class TenantAwareCallable<T> implements Callable<T> {
private final String tenantName;
private final Callable<T> delegate;
TenantAwareCallable(Callable<T> delegate, String tenantName) {
this.delegate = delegate;
this.tenantName = tenantName;
}
@Override
public T call() throws Exception {
// set threadlocal
TenantContext.setCurrentTenantName(tenantName);
try {
return delegate.call();
} catch (Exception e) {
// log and handle
} finally {
TenantContext.clear();
}
}
}
这已经可以在应用程序中使用。但是我想要的是一些自定义的 @Async
注释,例如 @TenantAwareAsync
或 @TenantPreservingAsync
,它包装了由 Spring 创建的可调用对象,然后执行它。
有什么方法可以开始吗?
提前致谢!
我有一个可行的解决方案,所以我认为在这里分享它可能有一天会对某些人有所帮助。
我不是通过使用 TenantAwareCallable
而是通过自定义 Executor 服务来解决它的。我选择扩展 Spring 的 ThreadPoolTaskScheduler(因为这是我们在项目中使用的)。
public class ContextAwareThreadPoolTaskScheduler extends ThreadPoolTaskScheduler {
@Override
protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return new ContextAwareThreadPoolTaskExecutor(poolSize, threadFactory, rejectedExecutionHandler);
}
}
上下文数据的实际设置是在自定义的 ScheduledThreadPoolExecutor
:
中完成的
public class ContextAwareThreadPoolTaskExecutor extends ScheduledThreadPoolExecutor {
public ContextAwareThreadPoolTaskExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
super(poolSize, threadFactory, rejectedExecutionHandler);
}
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return new ContextAwareTask<V>(task);
}
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return new ContextAwareTask<V>(task);
}
static private class ContextAwareTask<T> implements RunnableScheduledFuture<T> {
private final RunnableScheduledFuture<T> delegate;
private final TenantContextHolder multitenantContextHolder;
private final LoggingContextHolder loggingContextHolder;
private final SecurityContext securityContext;
ContextAwareTask(RunnableScheduledFuture<T> delegate) {
this.delegate = delegate;
multitenantContextHolder = TenantContextHolder.newInstance();
loggingContextHolder = LoggingContextHolder.newInstance();
securityContext = SecurityContextHolder.getContext();
}
@Override
public void run() {
multitenantContextHolder.apply();
loggingContextHolder.apply();
SecurityContextHolder.setContext(securityContext);
delegate.run();
SecurityContextHolder.clearContext();
loggingContextHolder.clear();
multitenantContextHolder.clear();
}
// all other methods are just delegates
}
}
持有者基本上只是存储上下文状态并将其应用于新线程的对象。
public class TenantContextHolder {
private String tenantName;
public static TenantContextHolder newInstance() {
return new TenantContextHolder();
}
private TenantContextHolder() {
this.tenantName = TenantContext.getCurrentTenantName();
}
public void apply() {
TenantContext.setCurrentTenantName(tenantName);
}
public void clear() {
TenantContext.clear();
}
}
然后可以在 Spring 环境中配置调度程序的自定义实现。
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
private ThreadPoolTaskScheduler taskScheduler;
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
if (taskScheduler == null) {
taskScheduler = new ContextAwareThreadPoolTaskScheduler();
}
return taskScheduler;
}
@Override
public Executor getAsyncExecutor() {
return taskScheduler();
}
}
我正在开发一个支持多租户的应用程序。租户的唯一标识符存储在本地线程中,可以通过某些服务访问。 为了允许并行处理,我创建了一个 Callable 包装器,设置线程局部变量:
class TenantAwareCallable<T> implements Callable<T> {
private final String tenantName;
private final Callable<T> delegate;
TenantAwareCallable(Callable<T> delegate, String tenantName) {
this.delegate = delegate;
this.tenantName = tenantName;
}
@Override
public T call() throws Exception {
// set threadlocal
TenantContext.setCurrentTenantName(tenantName);
try {
return delegate.call();
} catch (Exception e) {
// log and handle
} finally {
TenantContext.clear();
}
}
}
这已经可以在应用程序中使用。但是我想要的是一些自定义的 @Async
注释,例如 @TenantAwareAsync
或 @TenantPreservingAsync
,它包装了由 Spring 创建的可调用对象,然后执行它。
有什么方法可以开始吗?
提前致谢!
我有一个可行的解决方案,所以我认为在这里分享它可能有一天会对某些人有所帮助。
我不是通过使用 TenantAwareCallable
而是通过自定义 Executor 服务来解决它的。我选择扩展 Spring 的 ThreadPoolTaskScheduler(因为这是我们在项目中使用的)。
public class ContextAwareThreadPoolTaskScheduler extends ThreadPoolTaskScheduler {
@Override
protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return new ContextAwareThreadPoolTaskExecutor(poolSize, threadFactory, rejectedExecutionHandler);
}
}
上下文数据的实际设置是在自定义的 ScheduledThreadPoolExecutor
:
public class ContextAwareThreadPoolTaskExecutor extends ScheduledThreadPoolExecutor {
public ContextAwareThreadPoolTaskExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
super(poolSize, threadFactory, rejectedExecutionHandler);
}
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return new ContextAwareTask<V>(task);
}
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return new ContextAwareTask<V>(task);
}
static private class ContextAwareTask<T> implements RunnableScheduledFuture<T> {
private final RunnableScheduledFuture<T> delegate;
private final TenantContextHolder multitenantContextHolder;
private final LoggingContextHolder loggingContextHolder;
private final SecurityContext securityContext;
ContextAwareTask(RunnableScheduledFuture<T> delegate) {
this.delegate = delegate;
multitenantContextHolder = TenantContextHolder.newInstance();
loggingContextHolder = LoggingContextHolder.newInstance();
securityContext = SecurityContextHolder.getContext();
}
@Override
public void run() {
multitenantContextHolder.apply();
loggingContextHolder.apply();
SecurityContextHolder.setContext(securityContext);
delegate.run();
SecurityContextHolder.clearContext();
loggingContextHolder.clear();
multitenantContextHolder.clear();
}
// all other methods are just delegates
}
}
持有者基本上只是存储上下文状态并将其应用于新线程的对象。
public class TenantContextHolder {
private String tenantName;
public static TenantContextHolder newInstance() {
return new TenantContextHolder();
}
private TenantContextHolder() {
this.tenantName = TenantContext.getCurrentTenantName();
}
public void apply() {
TenantContext.setCurrentTenantName(tenantName);
}
public void clear() {
TenantContext.clear();
}
}
然后可以在 Spring 环境中配置调度程序的自定义实现。
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
private ThreadPoolTaskScheduler taskScheduler;
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
if (taskScheduler == null) {
taskScheduler = new ContextAwareThreadPoolTaskScheduler();
}
return taskScheduler;
}
@Override
public Executor getAsyncExecutor() {
return taskScheduler();
}
}