当一个@Around advice 不进行时的 Advice precedence 问题
Advice precedence problem when one @Around advice does not proceed
已更新以使用附加信息重新表述问题
我们有两个注释:
CustomLogging
PollableStreamListener
两者都是使用 Spring AOP 的方面实现的。
CustomLogging
注解:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomLogging {
}
CustomLoggingAspect
class:
@Aspect
@Component
@Slf4j
@Order(value = 1)
public class CustomLoggingAspect {
@Before("@annotation(customLogging)")
public void addCustomLogging(CustomLogging customLogging) {
log.info("Logging some information");
}
}
PollableStreamListener
注解:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PollableStreamListener {
}
PollableStreamListenerAspect
class:
@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint,
PollableStreamListener pollableStreamListener, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
callback.acknowledge(Status.ACCEPT);
} catch (Throwable e) {
callback.acknowledge(Status.REJECT);
throw new PollableStreamListenerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
log.info("Re-queue");
}
}
}
}
我们有一个名为 CleanupController
的 class,它会根据计划定期执行。
CleanupController
class:
@Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
public void pollForDeletionRequest() {
log.trace("Polling for new messages");
cleanupInput.poll(cleanupSubmissionService::submitDeletion);
}
当计划执行时,它会调用另一个 class 中的方法,该方法同时注释有 PollableStreamListener
和 CustomLogging
。我添加了一个 Thread.sleep()
来模仿需要一段时间才能执行的方法。
@PollableStreamListener
@CustomLogging
public void submitDeletion(Message<?> received) {
try {
log.info("Starting processing");
Thread.sleep(10000);
log.info("Finished processing");
} catch (Exception e) {
log.info("Error", e);
}
}
我面临的问题是,每次我们使用 @Schedule
轮询新消息时,都会打印 CustomLogging
产生的输出,但我只希望它在注释方法时打印实际执行(这可能现在发生,也可能在将来发生,具体取决于当前是否正在处理另一条消息)。这会导致日志消息混乱,因为它暗示消息现在正在处理,而实际上它已重新排队等待将来执行。
有没有什么方法可以让这些注解很好地协同工作,以便 CustomLogging
输出只在被注解的方法执行时发生?
更新以在 PollableStreamListener
上使用 @Order
根据@dunni的建议,我对上面的原始示例做了如下修改。
在 PollableStreamListenerAspect
上设置 1 的顺序:
@Aspect
@Component
@Slf4j
@Order(value = 1)
public class PollableStreamListenerAspect {
...
}
将订单增加到 2 CustomLoggingAspect
:
@Aspect
@Component
@Slf4j
@Order(value = 2)
public class CustomLoggingAspect {
...
}
我发现在进行这些更改后,轮询根本无法检测到新请求。引入此问题的是 PollableStreamListenerAspect
上的更改(我注释掉了该行并重新 运行 它,并且一切都像以前一样)。
更新以在 PollableStreamListener
上使用 @Order(value = Ordered.HIGHEST_PRECEDENCE)
我已将 PollableStreamListener
更新为使用 HIGHEST_PRECEDENCE
并更新了 @Around
值:
@Aspect
@Component
@Slf4j
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener)")
public void receiveMessage(ProceedingJoinPoint joinPoint) {
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
} catch (Throwable e) {
e.printStackTrace();
throw new PollableStreamListenerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
log.info("Re-queue");
}
}
}
这现在部分有效。当我发送一条 Kafka 消息时,它会得到处理,并且来自 CustomLogging
注释的日志记录仅在未处理另一条 Kafka 消息时打印。到目前为止一切顺利。
下一个挑战是让 @Around
接受通过 Kafka 提供的 Message
。我尝试使用上面的示例更改以下行:
@Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) {
...
}
服务器正常启动,但是当我发布Kafka消息时出现以下异常:
2021-04-22 10:38:00,055 ERROR [scheduling-1] org.springframework.core.log.LogAccessor: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation), failedMessage=GenericMessage...
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:330)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.handle(DefaultPollableMessageSource.java:361)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:219)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:200)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:68)
at xyx.pollForDeletionRequest(CleanupController.java:35)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:596)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:624)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:72)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at xyz.CleanupSubmissionServiceImpl$$EnhancerBySpringCGLIB$37f6f8.submitDeletion(<generated>)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:327)
... 17 more
您可以在两个方面添加@Order
注释来定义顺序。如果未提供,则方面的执行顺序未定义。
例如,如果 Log 方面应该始终作为最后一个建议执行,那么无论有什么其他建议,都给它例如10000 的订单,以及所有其他建议较低的数字(首先执行较低的数字)。
由于 this problem,您需要在 PollableStreamListenerAspect
上使用 @Order(value = Ordered.HIGHEST_PRECEDENCE)
。确实很奇怪,但是它会按您希望的那样工作。不过,IMO 这个问题应该在 Spring 中得到解决。必须使用此解决方法是丑陋的,并且仅当您异步调用 proceed()
的方面确实具有最高优先级时才有效,但情况并非总是如此。作为替代方案,您可以使用本机 AspectJ 及其自己的声明通知优先级的概念,它独立于 Spring 内部。
这是您的应用程序的简化版本 MCVE:
注解:
package de.scrum_master.spring.q67155048;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomLogging {}
package de.scrum_master.spring.q67155048;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PollableStreamListener {}
带有两个注释的方法的组件:
package de.scrum_master.spring.q67155048;
import org.springframework.stereotype.Component;
@Component
public class MyComponent {
private int counter = 0;
@PollableStreamListener
@CustomLogging
public void submitDeletion() {
try {
System.out.println(" Starting processing #" + ++counter);
Thread.sleep(1000);
System.out.println(" Finished processing #" + counter);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
方面:
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.stereotype.Component;
@Aspect
@Component
public class CustomLoggingAspect {
@Before("@annotation(de.scrum_master.spring.q67155048.CustomLogging)")
public void addCustomLogging() {
System.out.println("Logging");
}
}
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener)")
public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
System.out.println("Receiving message");
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
joinPoint.proceed();
}
catch (Throwable throwable) {
throwable.printStackTrace();
}
finally {
paused = false;
}
};
EXECUTOR_SERVICE.submit(runnable);
}
else {
System.out.println(" Re-queue");
}
}
}
驱动申请:
应用程序每 500 毫秒调用一次目标方法,但执行需要 1,000 毫秒。因此,在这种情况下,我们希望看到约 50% 的调用在没有任何日志记录的情况下重新排队,因为优先级较高的方面不会继续执行目标方法。
package de.scrum_master.spring.q67155048;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@Configuration
public class DemoApplication {
public static void main(String[] args) throws InterruptedException {
try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
doStuff(appContext);
}
}
private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
MyComponent myComponent = appContext.getBean(MyComponent.class);
for (int i = 0; i < 10; i++) {
myComponent.submitDeletion();
Thread.sleep(500);
}
// This is just to make the application exit cleanly
PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
}
}
控制台日志:
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.8.RELEASE)
2021-04-20 12:56:03.675 INFO 13560 --- [ main] d.s.spring.q67155048.DemoApplication : Starting DemoApplication on Xander-Ultrabook with PID 13560 (C:\Users\alexa\Documents\java-src\spring-aop-playground\target\classes started by alexa in C:\Users\alexa\Documents\java-src\spring-aop-playground)
...
2021-04-20 12:56:07.666 INFO 13560 --- [ main] d.s.spring.q67155048.DemoApplication : Started DemoApplication in 4.65 seconds (JVM running for 7.181)
Receiving message
Logging
Starting processing #1
Receiving message
Re-queue
Finished processing #1
Receiving message
Logging
Starting processing #2
Receiving message
Re-queue
Finished processing #2
Receiving message
Logging
Starting processing #3
Receiving message
Re-queue
Finished processing #3
Receiving message
Logging
Starting processing #4
Receiving message
Re-queue
Finished processing #4
Receiving message
Logging
Starting processing #5
Receiving message
Re-queue
Finished processing #5
2021-04-20 12:56:12.767 INFO 13560 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
...
看到了吗?我们计算 10 倍“接收消息”,但只有 5 倍“重新排队”和 5 倍“记录”。请注意,我对处理调用进行了编号,因为它们是异步的。这样当他们开始和结束时更容易跟上。
更新回复用户评论:
我更新了我的 MCVE 以重现您的参数绑定问题。新的或更改的文件是:
package de.scrum_master.spring.q67155048;
public class Message<T> {
private T content;
public Message(T content) {
this.content = content;
}
@Override
public String toString() {
return "Message{" +
"content=" + content +
'}';
}
}
package de.scrum_master.spring.q67155048;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@Configuration
public class DemoApplication {
public static void main(String[] args) throws InterruptedException {
try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
doStuff(appContext);
}
}
private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
MyComponent myComponent = appContext.getBean(MyComponent.class);
Message<String> message = new Message<>("Hi there!");
for (int i = 0; i < 10; i++) {
myComponent.submitDeletion(message);
Thread.sleep(500);
}
// This is just to make the application exit cleanly
PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
}
}
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
System.out.println("Receiving message");
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
System.out.println("dataCapsule = " + dataCapsule);
joinPoint.proceed();
}
catch (Throwable throwable) {
throwable.printStackTrace();
}
finally {
paused = false;
}
};
EXECUTOR_SERVICE.submit(runnable);
}
else {
System.out.println(" Re-queue");
}
}
}
根据您自己的经验,这会产生:
Exception in thread "main" java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:605)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at de.scrum_master.spring.q67155048.MyComponent$$EnhancerBySpringCGLIB$baa410d.submitDeletion(<generated>)
at de.scrum_master.spring.q67155048.DemoApplication.doStuff(DemoApplication.java:21)
at de.scrum_master.spring.q67155048.DemoApplication.main(DemoApplication.java:13)
您正在点击 , and I have commented on the closed Spring issue #16956,希望得到它的回复并有人修复它。
目前,您的解决方法不是使用优雅的 AOP 参数绑定,而是使用 JoinPoint.getArgs()
:
手动获取参数
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
//@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && execution(* *(*, ..))")
//public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
Object dataCapsule = joinPoint.getArgs()[0];
System.out.println("Receiving message");
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
System.out.println("dataCapsule = " + dataCapsule);
joinPoint.proceed();
}
catch (Throwable throwable) {
throwable.printStackTrace();
}
finally {
paused = false;
}
};
EXECUTOR_SERVICE.submit(runnable);
}
else {
System.out.println(" Re-queue");
}
}
}
现在它又像这样工作了:
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #1, message = Message{content=Hi there!}
Receiving message
Re-queue
Receiving message
Re-queue
Finished processing #1, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #2, message = Message{content=Hi there!}
Receiving message
Re-queue
Finished processing #2, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #3, message = Message{content=Hi there!}
Receiving message
Re-queue
Finished processing #3, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #4, message = Message{content=Hi there!}
Receiving message
Re-queue
Finished processing #4, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #5, message = Message{content=Hi there!}
已更新以使用附加信息重新表述问题
我们有两个注释:
CustomLogging
PollableStreamListener
两者都是使用 Spring AOP 的方面实现的。
CustomLogging
注解:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomLogging {
}
CustomLoggingAspect
class:
@Aspect
@Component
@Slf4j
@Order(value = 1)
public class CustomLoggingAspect {
@Before("@annotation(customLogging)")
public void addCustomLogging(CustomLogging customLogging) {
log.info("Logging some information");
}
}
PollableStreamListener
注解:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PollableStreamListener {
}
PollableStreamListenerAspect
class:
@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint,
PollableStreamListener pollableStreamListener, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
callback.acknowledge(Status.ACCEPT);
} catch (Throwable e) {
callback.acknowledge(Status.REJECT);
throw new PollableStreamListenerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
log.info("Re-queue");
}
}
}
}
我们有一个名为 CleanupController
的 class,它会根据计划定期执行。
CleanupController
class:
@Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
public void pollForDeletionRequest() {
log.trace("Polling for new messages");
cleanupInput.poll(cleanupSubmissionService::submitDeletion);
}
当计划执行时,它会调用另一个 class 中的方法,该方法同时注释有 PollableStreamListener
和 CustomLogging
。我添加了一个 Thread.sleep()
来模仿需要一段时间才能执行的方法。
@PollableStreamListener
@CustomLogging
public void submitDeletion(Message<?> received) {
try {
log.info("Starting processing");
Thread.sleep(10000);
log.info("Finished processing");
} catch (Exception e) {
log.info("Error", e);
}
}
我面临的问题是,每次我们使用 @Schedule
轮询新消息时,都会打印 CustomLogging
产生的输出,但我只希望它在注释方法时打印实际执行(这可能现在发生,也可能在将来发生,具体取决于当前是否正在处理另一条消息)。这会导致日志消息混乱,因为它暗示消息现在正在处理,而实际上它已重新排队等待将来执行。
有没有什么方法可以让这些注解很好地协同工作,以便 CustomLogging
输出只在被注解的方法执行时发生?
更新以在 PollableStreamListener
@Order
根据@dunni的建议,我对上面的原始示例做了如下修改。
在 PollableStreamListenerAspect
上设置 1 的顺序:
@Aspect
@Component
@Slf4j
@Order(value = 1)
public class PollableStreamListenerAspect {
...
}
将订单增加到 2 CustomLoggingAspect
:
@Aspect
@Component
@Slf4j
@Order(value = 2)
public class CustomLoggingAspect {
...
}
我发现在进行这些更改后,轮询根本无法检测到新请求。引入此问题的是 PollableStreamListenerAspect
上的更改(我注释掉了该行并重新 运行 它,并且一切都像以前一样)。
更新以在 PollableStreamListener
@Order(value = Ordered.HIGHEST_PRECEDENCE)
我已将 PollableStreamListener
更新为使用 HIGHEST_PRECEDENCE
并更新了 @Around
值:
@Aspect
@Component
@Slf4j
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener)")
public void receiveMessage(ProceedingJoinPoint joinPoint) {
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
} catch (Throwable e) {
e.printStackTrace();
throw new PollableStreamListenerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
log.info("Re-queue");
}
}
}
这现在部分有效。当我发送一条 Kafka 消息时,它会得到处理,并且来自 CustomLogging
注释的日志记录仅在未处理另一条 Kafka 消息时打印。到目前为止一切顺利。
下一个挑战是让 @Around
接受通过 Kafka 提供的 Message
。我尝试使用上面的示例更改以下行:
@Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) {
...
}
服务器正常启动,但是当我发布Kafka消息时出现以下异常:
2021-04-22 10:38:00,055 ERROR [scheduling-1] org.springframework.core.log.LogAccessor: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation), failedMessage=GenericMessage...
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:330)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.handle(DefaultPollableMessageSource.java:361)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:219)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:200)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:68)
at xyx.pollForDeletionRequest(CleanupController.java:35)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:596)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:624)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:72)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at xyz.CleanupSubmissionServiceImpl$$EnhancerBySpringCGLIB$37f6f8.submitDeletion(<generated>)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:327)
... 17 more
您可以在两个方面添加@Order
注释来定义顺序。如果未提供,则方面的执行顺序未定义。
例如,如果 Log 方面应该始终作为最后一个建议执行,那么无论有什么其他建议,都给它例如10000 的订单,以及所有其他建议较低的数字(首先执行较低的数字)。
由于 this problem,您需要在 PollableStreamListenerAspect
上使用 @Order(value = Ordered.HIGHEST_PRECEDENCE)
。确实很奇怪,但是它会按您希望的那样工作。不过,IMO 这个问题应该在 Spring 中得到解决。必须使用此解决方法是丑陋的,并且仅当您异步调用 proceed()
的方面确实具有最高优先级时才有效,但情况并非总是如此。作为替代方案,您可以使用本机 AspectJ 及其自己的声明通知优先级的概念,它独立于 Spring 内部。
这是您的应用程序的简化版本 MCVE:
注解:
package de.scrum_master.spring.q67155048;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomLogging {}
package de.scrum_master.spring.q67155048;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PollableStreamListener {}
带有两个注释的方法的组件:
package de.scrum_master.spring.q67155048;
import org.springframework.stereotype.Component;
@Component
public class MyComponent {
private int counter = 0;
@PollableStreamListener
@CustomLogging
public void submitDeletion() {
try {
System.out.println(" Starting processing #" + ++counter);
Thread.sleep(1000);
System.out.println(" Finished processing #" + counter);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
方面:
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.stereotype.Component;
@Aspect
@Component
public class CustomLoggingAspect {
@Before("@annotation(de.scrum_master.spring.q67155048.CustomLogging)")
public void addCustomLogging() {
System.out.println("Logging");
}
}
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener)")
public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
System.out.println("Receiving message");
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
joinPoint.proceed();
}
catch (Throwable throwable) {
throwable.printStackTrace();
}
finally {
paused = false;
}
};
EXECUTOR_SERVICE.submit(runnable);
}
else {
System.out.println(" Re-queue");
}
}
}
驱动申请:
应用程序每 500 毫秒调用一次目标方法,但执行需要 1,000 毫秒。因此,在这种情况下,我们希望看到约 50% 的调用在没有任何日志记录的情况下重新排队,因为优先级较高的方面不会继续执行目标方法。
package de.scrum_master.spring.q67155048;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@Configuration
public class DemoApplication {
public static void main(String[] args) throws InterruptedException {
try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
doStuff(appContext);
}
}
private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
MyComponent myComponent = appContext.getBean(MyComponent.class);
for (int i = 0; i < 10; i++) {
myComponent.submitDeletion();
Thread.sleep(500);
}
// This is just to make the application exit cleanly
PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
}
}
控制台日志:
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.8.RELEASE)
2021-04-20 12:56:03.675 INFO 13560 --- [ main] d.s.spring.q67155048.DemoApplication : Starting DemoApplication on Xander-Ultrabook with PID 13560 (C:\Users\alexa\Documents\java-src\spring-aop-playground\target\classes started by alexa in C:\Users\alexa\Documents\java-src\spring-aop-playground)
...
2021-04-20 12:56:07.666 INFO 13560 --- [ main] d.s.spring.q67155048.DemoApplication : Started DemoApplication in 4.65 seconds (JVM running for 7.181)
Receiving message
Logging
Starting processing #1
Receiving message
Re-queue
Finished processing #1
Receiving message
Logging
Starting processing #2
Receiving message
Re-queue
Finished processing #2
Receiving message
Logging
Starting processing #3
Receiving message
Re-queue
Finished processing #3
Receiving message
Logging
Starting processing #4
Receiving message
Re-queue
Finished processing #4
Receiving message
Logging
Starting processing #5
Receiving message
Re-queue
Finished processing #5
2021-04-20 12:56:12.767 INFO 13560 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
...
看到了吗?我们计算 10 倍“接收消息”,但只有 5 倍“重新排队”和 5 倍“记录”。请注意,我对处理调用进行了编号,因为它们是异步的。这样当他们开始和结束时更容易跟上。
更新回复用户评论:
我更新了我的 MCVE 以重现您的参数绑定问题。新的或更改的文件是:
package de.scrum_master.spring.q67155048;
public class Message<T> {
private T content;
public Message(T content) {
this.content = content;
}
@Override
public String toString() {
return "Message{" +
"content=" + content +
'}';
}
}
package de.scrum_master.spring.q67155048;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@Configuration
public class DemoApplication {
public static void main(String[] args) throws InterruptedException {
try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
doStuff(appContext);
}
}
private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
MyComponent myComponent = appContext.getBean(MyComponent.class);
Message<String> message = new Message<>("Hi there!");
for (int i = 0; i < 10; i++) {
myComponent.submitDeletion(message);
Thread.sleep(500);
}
// This is just to make the application exit cleanly
PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
}
}
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
System.out.println("Receiving message");
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
System.out.println("dataCapsule = " + dataCapsule);
joinPoint.proceed();
}
catch (Throwable throwable) {
throwable.printStackTrace();
}
finally {
paused = false;
}
};
EXECUTOR_SERVICE.submit(runnable);
}
else {
System.out.println(" Re-queue");
}
}
}
根据您自己的经验,这会产生:
Exception in thread "main" java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:605)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at de.scrum_master.spring.q67155048.MyComponent$$EnhancerBySpringCGLIB$baa410d.submitDeletion(<generated>)
at de.scrum_master.spring.q67155048.DemoApplication.doStuff(DemoApplication.java:21)
at de.scrum_master.spring.q67155048.DemoApplication.main(DemoApplication.java:13)
您正在点击
目前,您的解决方法不是使用优雅的 AOP 参数绑定,而是使用 JoinPoint.getArgs()
:
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
//@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && execution(* *(*, ..))")
//public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
Object dataCapsule = joinPoint.getArgs()[0];
System.out.println("Receiving message");
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
System.out.println("dataCapsule = " + dataCapsule);
joinPoint.proceed();
}
catch (Throwable throwable) {
throwable.printStackTrace();
}
finally {
paused = false;
}
};
EXECUTOR_SERVICE.submit(runnable);
}
else {
System.out.println(" Re-queue");
}
}
}
现在它又像这样工作了:
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #1, message = Message{content=Hi there!}
Receiving message
Re-queue
Receiving message
Re-queue
Finished processing #1, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #2, message = Message{content=Hi there!}
Receiving message
Re-queue
Finished processing #2, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #3, message = Message{content=Hi there!}
Receiving message
Re-queue
Finished processing #3, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #4, message = Message{content=Hi there!}
Receiving message
Re-queue
Finished processing #4, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #5, message = Message{content=Hi there!}