向邮件入站通道适配器添加建议

Adding advice to a mail inbound channel adapter

我正在对 spring 集成流程进行集成测试,该流程从邮件入站通道适配器开始。我将测试电子邮件发送到模拟 GreenMail 电子邮件服务器,然后测试预期结果。但是因为邮件是异步的,所以目前只有在发送邮件后等待,直到流程完成,测试才能通过。

这是邮件适配器配置:

<int-mail:inbound-channel-adapter id="imapAdapter"
      store-uri="#{mailConnectionString}"
      java-mail-properties="javaMailProperties" channel="inboundChannel"
      should-delete-messages="false" should-mark-messages-as-read="true"
      auto-startup="true">
    <int:poller id="emailPoller" max-messages-per-poll="1" fixed-rate="5000">
    </int:poller>
</int-mail:inbound-channel-adapter>

所以,参考这个:Adding Completion Advice,我想我可以简单地等待完成建议,然后继续测试。但是不能给邮件适配器添加通知:

Caused by: org.springframework.beans.NotWritablePropertyException: Invalid property 'adviceChain' of bean class [org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean]: Bean property 'adviceChain' is not writable or has an invalid setter method. Does the parameter type of the setter match the return type of the getter?

还尝试了回复生成处理程序(来自上面 link),但没有找到 bean。

所以。如何向入站邮件适配器添加建议?或者是否有更好的方法在整个流程完成后测试邮件适配器?

回答建议后更新 我更改了测试以在设置中添加建议。

@Autowired
private SourcePollingChannelAdapter emailAdapter;

private MyAdvice imapAdapterCompletionAdvice;

@Before
public void setup() throws Exception
{
    imapAdapterCompletionAdvice = new MyAdvice();
    List<Advice> theAdvice = new ArrayList<Advice>();
    theAdvice.add(imapAdapterCompletionAdvice);
    emailAdapter.setAdviceChain(theAdvice);
    emailAdapter.start();           
} 

但是建议没有调用。我错过了什么吗?

这是建议class:

public class MyAdvice implements MethodInterceptor {

    private final CountDownLatch latch = new CountDownLatch(1);

    public Object invoke(MethodInvocation invocation) throws Throwable {
        Object proceed = invocation.proceed();
        System.out.println(proceed);
        if (proceed instanceof Boolean) {
            Boolean mailReceived = (Boolean) proceed;
            if(mailReceived){
                latch.countDown();
            }
        }

        return proceed;
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

我不完全清楚您要做什么,但入站通道适配器不是消息处理程序(因此没有处理程序 bean)。轮询的入站适配器是 MessageSource,但通知源无济于事,因为我们调用 receive(),然后将消息发送到流。

但是,您可以将 Advice 个对象添加到 <poller/>advice-chain

around 建议将覆盖 receive()(来自源)和 send() 到通道,因此您可以在那里暂停线程。

您可以直接在轮询器上配置建议。 如果您想以编程方式执行此操作,adviceChain 属性 在工厂 bean 的 pollerMetadata 字段上。

编辑

我不会再为此使用 BFPP - question/answer 已经过时了;我们现在将处理程序公开为 bean 名称 id.handler(消息源也类似),因此不再需要 BFPP

等待创建 bean 比尝试将 属性 注入 bean 定义要容易得多。

我会做这样的事情...

  1. 在测试用例中将自动启动设置为 false(使用 属性 占位符,因此对于生产为真,对于测试为假)。
  2. 注入建议链。
  3. 启动适配器...

.

@Autowired
private SourcePollingChannelAdapter adapter;

...

@Test
public ... {

    this.adapter.setAdviceChain(...);
    this.adapter.start();
    ...
}

如果您不想使用此技术,请使用 BeanPostProcessorpostProcessAfterInitialization - 在工厂 bean 提供通道适配器之后)而不是 BeanFactoryPostProcessor修改建议链。

您是正确的,即使没有投票结果也会调用建议。

您可以使用另一个建议(AbstractMessageSourceAdvice 的子类 - see Smart Polling

此建议仅建议 receive() 方法,并且可以判断轮询结果是否为消息;然后它可以在处理完消息后启动您的其他建议。

EDIT2

需要重置 initialized 标志,以便重新应用建议。这可以使用反射来完成(Spring 有一个方便的 DirectFieldAccessor)。

如果您不习惯使用反射,您可以 start/stop/start 它会做同样的事情,但我们需要确保第一个 start 实际上不会触发轮询。

反射示例:

@Autowired
private SourcePollingChannelAdapter adapter;

@Test
public void testAdvice() throws Exception {
    List<Advice> adviceChain = new ArrayList<Advice>();
    final AtomicBoolean hasMessage = new AtomicBoolean();
    final CountDownLatch latch = new CountDownLatch(1);
    class MessageDetector extends AbstractMessageSourceAdvice {

        @Override
        public boolean beforeReceive(MessageSource<?> source) {
            return true;
        }

        @Override
        public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
            hasMessage.set(result != null);
            System.out.println("has message:" + hasMessage.get());
            return result;
        }

    }
    adviceChain.add(new MessageDetector());
    class MyAdvice implements MethodInterceptor {

        @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
            System.out.println("in myAdvice before, hasmessage:" + hasMessage.get());
            Object proceed = invocation.proceed();
            System.out.println("in myAdvice after, hasmessage:" + hasMessage.get());
            latch.countDown();
            return proceed;
        }

    }
    adviceChain.add(new MyAdvice());
    adapter.setAdviceChain(adviceChain);
    new DirectFieldAccessor(adapter).setPropertyValue("initialized", false);
    adapter.start();
    assertTrue(latch.await(10, TimeUnit.SECONDS));
}

触发器操作示例...

@Autowired
private SourcePollingChannelAdapter adapter;

@Test
public void testAdvice() throws Exception {
    List<Advice> adviceChain = new ArrayList<Advice>();
    final AtomicBoolean hasMessage = new AtomicBoolean();
    final CountDownLatch latch = new CountDownLatch(1);
    class MessageDetector extends AbstractMessageSourceAdvice {

        @Override
        public boolean beforeReceive(MessageSource<?> source) {
            return true;
        }

        @Override
        public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
            hasMessage.set(result != null);
            System.out.println("has message:" + hasMessage.get());
            return result;
        }

    }
    adviceChain.add(new MessageDetector());
    class MyAdvice implements MethodInterceptor {

        @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
            System.out.println("in myAdvice before, hasmessage:" + hasMessage.get());
            Object proceed = invocation.proceed();
            System.out.println("in myAdvice after, hasmessage:" + hasMessage.get());
            latch.countDown();
            return proceed;
        }

    }
    adviceChain.add(new MyAdvice());
    adapter.setAdviceChain(adviceChain);
    adapter.setTrigger(new Trigger() {

        @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
            return null; // never poll
        }
    });
    adapter.start();
    adapter.stop();
    adapter.setTrigger(new PeriodicTrigger(1000));
    adapter.start();
    assertTrue(latch.await(10, TimeUnit.SECONDS));
}