当一个@Around advice 不进行时的 Advice precedence 问题

Advice precedence problem when one @Around advice does not proceed

已更新以使用附加信息重新表述问题

我们有两个注释:

两者都是使用 Spring AOP 的方面实现的。

CustomLogging注解:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomLogging {

}

CustomLoggingAspect class:

@Aspect
@Component
@Slf4j
@Order(value = 1)
public class CustomLoggingAspect {

  @Before("@annotation(customLogging)")
  public void addCustomLogging(CustomLogging customLogging) {
    log.info("Logging some information");
  }

}

PollableStreamListener注解:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PollableStreamListener {

}

PollableStreamListenerAspect class:

@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint,
      PollableStreamListener pollableStreamListener, Object dataCapsule) {
    if (dataCapsule instanceof Message) {
      Message<?> message = (Message<?>) dataCapsule;
      AcknowledgmentCallback callback = StaticMessageHeaderAccessor
          .getAcknowledgmentCallback(message);
      callback.noAutoAck();

      if (!paused) {
        // The separate thread is not busy with a previous message, so process this message:
        Runnable runnable = () -> {
          try {
            paused = true;

            // Call method to process this Kafka message
            joinPoint.proceed();

            callback.acknowledge(Status.ACCEPT);
          } catch (Throwable e) {
            callback.acknowledge(Status.REJECT);
            throw new PollableStreamListenerException(e);
          } finally {
            paused = false;
          }
        };

        executor.submit(runnable);
      } else {
        // The separate thread is busy with a previous message, so re-queue this message for later:
        callback.acknowledge(Status.REQUEUE);
        log.info("Re-queue");
      }
    }
  }

}

我们有一个名为 CleanupController 的 class,它会根据计划定期执行。

CleanupController class:

@Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
  public void pollForDeletionRequest() {
    log.trace("Polling for new messages");
    cleanupInput.poll(cleanupSubmissionService::submitDeletion);
  }

当计划执行时,它会调用另一个 class 中的方法,该方法同时注释有 PollableStreamListenerCustomLogging。我添加了一个 Thread.sleep() 来模仿需要一段时间才能执行的方法。

@PollableStreamListener
  @CustomLogging
  public void submitDeletion(Message<?> received) {
    try {
      log.info("Starting processing");
      Thread.sleep(10000);
      log.info("Finished processing");
    } catch (Exception e) {
      log.info("Error", e);
    }
  }

我面临的问题是,每次我们使用 @Schedule 轮询新消息时,都会打印 CustomLogging 产生的输出,但我只希望它在注释方法时打印实际执行(这可能现在发生,也可能在将来发生,具体取决于当前是否正在处理另一条消息)。这会导致日志消息混乱,因为它暗示消息现在正在处理,而实际上它已重新排队等待将来执行。

有没有什么方法可以让这些注解很好地协同工作,以便 CustomLogging 输出只在被注解的方法执行时发生?


更新以在 PollableStreamListener

上使用 @Order

根据@dunni的建议,我对上面的原始示例做了如下修改。

PollableStreamListenerAspect 上设置 1 的顺序:

@Aspect
@Component
@Slf4j
@Order(value = 1)
public class PollableStreamListenerAspect {
...
}

将订单增加到 2 CustomLoggingAspect:

@Aspect
@Component
@Slf4j
@Order(value = 2)
public class CustomLoggingAspect {
...
}

我发现在进行这些更改后,轮询根本无法检测到新请求。引入此问题的是 PollableStreamListenerAspect 上的更改(我注释掉了该行并重新 运行 它,并且一切都像以前一样)。


更新以在 PollableStreamListener

上使用 @Order(value = Ordered.HIGHEST_PRECEDENCE)

我已将 PollableStreamListener 更新为使用 HIGHEST_PRECEDENCE 并更新了 @Around 值:

@Aspect
@Component
@Slf4j
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener)")
  public void receiveMessage(ProceedingJoinPoint joinPoint) {
    if (!paused) {
      // The separate thread is not busy with a previous message, so process this message:
      Runnable runnable = () -> {
        try {
          paused = true;

          // Call method to process this Kafka message
          joinPoint.proceed();
        } catch (Throwable e) {
          e.printStackTrace();
          throw new PollableStreamListenerException(e);
        } finally {
          paused = false;
        }
      };

      executor.submit(runnable);
    } else {
      // The separate thread is busy with a previous message, so re-queue this message for later:
      log.info("Re-queue");
    }
  }
}

这现在部分有效。当我发送一条 Kafka 消息时,它会得到处理,并且来自 CustomLogging 注释的日志记录仅在未处理另一条 Kafka 消息时打印。到目前为止一切顺利。

下一个挑战是让 @Around 接受通过 Kafka 提供的 Message。我尝试使用上面的示例更改以下行:

  @Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener) && args(dataCapsule,..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) {
...
}

服务器正常启动,但是当我发布Kafka消息时出现以下异常:

2021-04-22 10:38:00,055 ERROR [scheduling-1] org.springframework.core.log.LogAccessor: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation), failedMessage=GenericMessage...
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:330)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.handle(DefaultPollableMessageSource.java:361)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:219)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:200)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:68)
    at xyx.pollForDeletionRequest(CleanupController.java:35)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:596)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:624)
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:72)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at xyz.CleanupSubmissionServiceImpl$$EnhancerBySpringCGLIB$37f6f8.submitDeletion(<generated>)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:327)
    ... 17 more

您可以在两个方面添加@Order注释来定义顺序。如果未提供,则方面的执行顺序未定义。 例如,如果 Log 方面应该始终作为最后一个建议执行,那么无论有什么其他建议,都给它例如10000 的订单,以及所有其他建议较低的数字(首先执行较低的数字)。

文档:https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop-ataspectj-advice-ordering

由于 this problem,您需要在 PollableStreamListenerAspect 上使用 @Order(value = Ordered.HIGHEST_PRECEDENCE)。确实很奇怪,但是它会按您希望的那样工作。不过,IMO 这个问题应该在 Spring 中得到解决。必须使用此解决方法是丑陋的,并且仅当您异步调用 proceed() 的方面确实具有最高优先级时才有效,但情况并非总是如此。作为替代方案,您可以使用本机 AspectJ 及其自己的声明通知优先级的概念,它独立于 Spring 内部。

这是您的应用程序的简化版本 MCVE:

注解:

package de.scrum_master.spring.q67155048;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomLogging {}
package de.scrum_master.spring.q67155048;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PollableStreamListener {}

带有两个注释的方法的组件:

package de.scrum_master.spring.q67155048;

import org.springframework.stereotype.Component;

@Component
public class MyComponent {
  private int counter = 0;

  @PollableStreamListener
  @CustomLogging
  public void submitDeletion() {
    try {
      System.out.println("  Starting processing #" + ++counter);
      Thread.sleep(1000);
      System.out.println("  Finished processing #" + counter);
    }
    catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

方面:

package de.scrum_master.spring.q67155048;

import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.stereotype.Component;

@Aspect
@Component
public class CustomLoggingAspect {
  @Before("@annotation(de.scrum_master.spring.q67155048.CustomLogging)")
  public void addCustomLogging() {
    System.out.println("Logging");
  }
}
package de.scrum_master.spring.q67155048;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
  public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
  private volatile boolean paused = false;

  @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener)")
  public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
    System.out.println("Receiving message");
    if (!paused) {
      // The separate thread is not busy with a previous message, so process this message:
      Runnable runnable = () -> {
        try {
          paused = true;
          joinPoint.proceed();
        }
        catch (Throwable throwable) {
          throwable.printStackTrace();
        }
        finally {
          paused = false;
        }
      };
      EXECUTOR_SERVICE.submit(runnable);
    }
    else {
      System.out.println("  Re-queue");
    }
  }
}

驱动申请:

应用程序每 500 毫秒调用一次目标方法,但执行需要 1,000 毫秒。因此,在这种情况下,我们希望看到约 50% 的调用在没有任何日志记录的情况下重新排队,因为优先级较高的方面不会继续执行目标方法。

package de.scrum_master.spring.q67155048;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;

@SpringBootApplication
@Configuration
public class DemoApplication {
  public static void main(String[] args) throws InterruptedException {
    try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
      doStuff(appContext);
    }
  }

  private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
    MyComponent myComponent = appContext.getBean(MyComponent.class);
    for (int i = 0; i < 10; i++) {
      myComponent.submitDeletion();
      Thread.sleep(500);
    }
    // This is just to make the application exit cleanly
    PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
  }
}

控制台日志:

  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.8.RELEASE)

2021-04-20 12:56:03.675  INFO 13560 --- [           main] d.s.spring.q67155048.DemoApplication     : Starting DemoApplication on Xander-Ultrabook with PID 13560 (C:\Users\alexa\Documents\java-src\spring-aop-playground\target\classes started by alexa in C:\Users\alexa\Documents\java-src\spring-aop-playground)
...
2021-04-20 12:56:07.666  INFO 13560 --- [           main] d.s.spring.q67155048.DemoApplication     : Started DemoApplication in 4.65 seconds (JVM running for 7.181)
Receiving message
Logging
  Starting processing #1
Receiving message
  Re-queue
  Finished processing #1
Receiving message
Logging
  Starting processing #2
Receiving message
  Re-queue
  Finished processing #2
Receiving message
Logging
  Starting processing #3
Receiving message
  Re-queue
  Finished processing #3
Receiving message
Logging
  Starting processing #4
Receiving message
  Re-queue
  Finished processing #4
Receiving message
Logging
  Starting processing #5
Receiving message
  Re-queue
  Finished processing #5
2021-04-20 12:56:12.767  INFO 13560 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
...

看到了吗?我们计算 10 倍“接收消息”,但只有 5 倍“重新排队”和 5 倍“记录”。请注意,我对处理调用进行了编号,因为它们是异步的。这样当他们开始和结束时更容易跟上。


更新回复用户评论:

我更新了我的 MCVE 以重现您的参数绑定问题。新的或更改的文件是:

package de.scrum_master.spring.q67155048;

public class Message<T> {
  private T content;

  public Message(T content) {
    this.content = content;
  }

  @Override
  public String toString() {
    return "Message{" +
      "content=" + content +
      '}';
  }
}
package de.scrum_master.spring.q67155048;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;

@SpringBootApplication
@Configuration
public class DemoApplication {
  public static void main(String[] args) throws InterruptedException {
    try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
      doStuff(appContext);
    }
  }

  private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
    MyComponent myComponent = appContext.getBean(MyComponent.class);
    Message<String> message = new Message<>("Hi there!");
    for (int i = 0; i < 10; i++) {
      myComponent.submitDeletion(message);
      Thread.sleep(500);
    }
    // This is just to make the application exit cleanly
    PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
  }
}
package de.scrum_master.spring.q67155048;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
  public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
  private volatile boolean paused = false;

  @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
    System.out.println("Receiving message");
    if (!paused) {
      // The separate thread is not busy with a previous message, so process this message:
      Runnable runnable = () -> {
        try {
          paused = true;
          System.out.println("dataCapsule = " + dataCapsule);
          joinPoint.proceed();
        }
        catch (Throwable throwable) {
          throwable.printStackTrace();
        }
        finally {
          paused = false;
        }
      };
      EXECUTOR_SERVICE.submit(runnable);
    }
    else {
      System.out.println("  Re-queue");
    }
  }
}

根据您自己的经验,这会产生:

Exception in thread "main" java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:605)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at de.scrum_master.spring.q67155048.MyComponent$$EnhancerBySpringCGLIB$baa410d.submitDeletion(<generated>)
    at de.scrum_master.spring.q67155048.DemoApplication.doStuff(DemoApplication.java:21)
    at de.scrum_master.spring.q67155048.DemoApplication.main(DemoApplication.java:13)

您正在点击 , and I have commented on the closed Spring issue #16956,希望得到它的回复并有人修复它。

目前,您的解决方法不是使用优雅的 AOP 参数绑定,而是使用 JoinPoint.getArgs():

手动获取参数
package de.scrum_master.spring.q67155048;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
  public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
  private volatile boolean paused = false;

  //@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
  @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && execution(* *(*, ..))")
  //public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
  public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
    Object dataCapsule = joinPoint.getArgs()[0];
    System.out.println("Receiving message");
    if (!paused) {
      // The separate thread is not busy with a previous message, so process this message:
      Runnable runnable = () -> {
        try {
          paused = true;
          System.out.println("dataCapsule = " + dataCapsule);
          joinPoint.proceed();
        }
        catch (Throwable throwable) {
          throwable.printStackTrace();
        }
        finally {
          paused = false;
        }
      };
      EXECUTOR_SERVICE.submit(runnable);
    }
    else {
      System.out.println("  Re-queue");
    }
  }
}

现在它又像这样工作了:

Receiving message
dataCapsule = Message{content=Hi there!}
Logging
  Starting processing #1, message = Message{content=Hi there!}
Receiving message
  Re-queue
Receiving message
  Re-queue
  Finished processing #1, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
  Starting processing #2, message = Message{content=Hi there!}
Receiving message
  Re-queue
  Finished processing #2, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
  Starting processing #3, message = Message{content=Hi there!}
Receiving message
  Re-queue
  Finished processing #3, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
  Starting processing #4, message = Message{content=Hi there!}
Receiving message
  Re-queue
  Finished processing #4, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
  Starting processing #5, message = Message{content=Hi there!}