如何在 Spring-Integration 中重试整个轮询流程
how to retry an entire poller flow in Spring-Integration
我一直在纠结如何为 Spring-集成轮询器流程实施重试模式(重试整个流程)。
请在下面找到我的(错误的)源代码(不起作用)。
我做错了什么?
(如果我在抛出异常的那一行打断点,它只会命中一次)
非常感谢您的宝贵时间和专业知识。
此致
nkjp
PS:也许尝试使用 RetryTemplate 扩展 AbstractHandleMessageAdvice?
return IntegrationFLows.from(SOME_QUEUE_CHANNEL)
.transform(p -> p, e -> e.poller(Pollers.fixedDelay(5000)
.advice(RetryInterceptorBuilder.stateless().maxAttempts(5).backOffOptions(1,2,10).build())))
.transform(p -> {
if (true) {
throw new RuntimeException("KABOOM");
}
return p;
})
.channel(new NullChannel())
.get();
如果添加 poller.advice()
,则 Advice
将应用于从 poll()
方法开始的整个流程。由于您已经从该队列中轮询了一条消息,因此在下一次尝试时没有可从中轮询的内容。对非事务队列使用重试有点反模式:您不回滚事务,因此您的数据不会返回存储以供下一个 poll()
.
使用
目前无法从某个点重试整个子流程,但您绝对可以在特定的错误端点上使用 RequestHandlerRetryAdvice
,例如您的 transform()
和 KABOOM
异常:
.transform(p -> {
if (true) {
throw new RuntimeException("KABOOM");
}
return p;
}, e -> e.advice(new RequestHandlerRetryAdvice()))
查看其 setRetryTemplate(RetryTemplate retryTemplate)
以获得更多重试选项,而不是默认情况下仅尝试 3 次。
为了做一个子流程,我们需要考虑实现一个HandleMessageAdvice
。
像这样:
.transform(p -> p, e -> e.poller(Pollers.fixedDelay(500000))
.advice(new HandleMessageAdvice() {
RetryOperationsInterceptor delegate =
RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffOptions(1, 2, 10)
.build();
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
return delegate.invoke(invocation);
}
}))
但再次说明:这不是 poller
建议。它是 MessageHandler.handleMessage()
上的端点。
我一直在纠结如何为 Spring-集成轮询器流程实施重试模式(重试整个流程)。 请在下面找到我的(错误的)源代码(不起作用)。
我做错了什么?
(如果我在抛出异常的那一行打断点,它只会命中一次)
非常感谢您的宝贵时间和专业知识。
此致
nkjp
PS:也许尝试使用 RetryTemplate 扩展 AbstractHandleMessageAdvice?
return IntegrationFLows.from(SOME_QUEUE_CHANNEL)
.transform(p -> p, e -> e.poller(Pollers.fixedDelay(5000)
.advice(RetryInterceptorBuilder.stateless().maxAttempts(5).backOffOptions(1,2,10).build())))
.transform(p -> {
if (true) {
throw new RuntimeException("KABOOM");
}
return p;
})
.channel(new NullChannel())
.get();
如果添加 poller.advice()
,则 Advice
将应用于从 poll()
方法开始的整个流程。由于您已经从该队列中轮询了一条消息,因此在下一次尝试时没有可从中轮询的内容。对非事务队列使用重试有点反模式:您不回滚事务,因此您的数据不会返回存储以供下一个 poll()
.
目前无法从某个点重试整个子流程,但您绝对可以在特定的错误端点上使用 RequestHandlerRetryAdvice
,例如您的 transform()
和 KABOOM
异常:
.transform(p -> {
if (true) {
throw new RuntimeException("KABOOM");
}
return p;
}, e -> e.advice(new RequestHandlerRetryAdvice()))
查看其 setRetryTemplate(RetryTemplate retryTemplate)
以获得更多重试选项,而不是默认情况下仅尝试 3 次。
为了做一个子流程,我们需要考虑实现一个HandleMessageAdvice
。
像这样:
.transform(p -> p, e -> e.poller(Pollers.fixedDelay(500000))
.advice(new HandleMessageAdvice() {
RetryOperationsInterceptor delegate =
RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffOptions(1, 2, 10)
.build();
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
return delegate.invoke(invocation);
}
}))
但再次说明:这不是 poller
建议。它是 MessageHandler.handleMessage()
上的端点。