当 spring 集成配置包含优先通道时如何 re-queue 消息
How to re-queue message when spring integration configuration includes a priority channel
我有一个使用优先通道的 Spring 集成配置。当从该通道读取一个项目时,会在那个时间点检查本地资源,如果资源无法处理该项目,我想重新queue消息以便另一台机器接收它.最初,我错误地抛出了一个异常,认为会发生 requeue,但是 这是行不通的,因为优先级通道在侦听器容器之外的另一个线程中执行。
我考虑过在入站通道适配器之后放置一个过滤器,如果当时资源不可用则抛出异常,但在那个时候无法对资源进行准确评估,因为当时资源可用性当根据优先级选择消息时,时间确实与可用时间匹配。
我的下一个想法是在优先通道之后和服务激活器之前放置一个过滤器,并将当前资源无法处理的消息直接发送到 discard-channel,它被定义为一个出站通道适配器,它发送消息回到原来的queue。这种方法有缺陷吗?
编辑 20150917:
根据 Gary 的建议,我已转移到 RabbitMQ 3.5.x 以便获得 built-in 优先级 queue。我现在在跟踪尝试次数时遇到问题,因为我的原始消息似乎被放回了 queue,而不是我修改后的消息。我已更新代码块以反映当前设置。
编辑 20150922:
我正在更新此 post 以反映我创建的概念代码库的最终证明。无论如何,我都不是 Spring-Integration 专家,所以请记住这一点以及此测试代码尚未准备好生产的事实。我的初衷是在抛出特定异常时重新提交消息并重试一定次数。这可以使用 StatefulRetryOperationsInterceptor
来完成。但是为了进一步试验,我希望能够在失败时 set/increment a header 然后在我的流程中有一些东西可以对该值做出反应。这是通过使用覆盖 additionalHeaders()
的 RepublishMessageRecoverer
的扩展来实现的。这个 object 然后用于配置 RetryOperationsInterceptor
。
另一件小事:当我的信号异常被抛出时,我想减少一些默认的 Spring 集成日志记录,所以我需要确保我按顺序命名我的错误通道 "errorChannel"替换 Spring 集成默认值。我还需要创建一个自定义 ErrorHandler
分配给 ListenerContainer
默认值,它将所有内容记录到 ERROR 级别。
这是我当前的设置:
Spring 集成 4.2.0.RELEASE
Spring AMQP 1.5.0.RELEASE
RabbitMQ 3.5.x
配置
@Autowired
public void setSpringIntegrationConfigHelper (SpringIntegrationHelper springIntegrationConfigHelper) {
this.springIntegrationConfigHelper = springIntegrationConfigHelper;
}
@Bean
public String priorityPOCQueueName() {
return "poc.priority";
}
@Bean
public Queue priorityPOCQueue(RabbitAdmin rabbitAdmin) {
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
//Adding the x-max-priority argument is what signals RabbitMQ that this is a priority queue. Must be Rabbit 3.5.x
Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-max-priority", 5);
Queue queue = new Queue(priorityPOCQueueName(),
durable,
exclusive,
autoDelete,
arguments);
rabbitAdmin.declareQueue(queue);
return queue;
}
@Bean
public Binding priorityPOCQueueBinding(RabbitAdmin rabbitAdmin) {
Binding binding = new Binding(priorityPOCQueueName(),
DestinationType.QUEUE,
"amq.direct",
priorityPOCQueue(rabbitAdmin).getName(),
null);
rabbitAdmin.declareBinding(binding);
return binding;
}
@Bean
public AmqpTemplate priorityPOCMessageTemplate(ConnectionFactory amqpConnectionFactory,
@Qualifier("priorityPOCQueueName") String queueName,
@Qualifier("jsonMessageConverter") MessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(amqpConnectionFactory);
template.setChannelTransacted(false);
template.setExchange("amq.direct");
template.setQueue(queueName);
template.setRoutingKey(queueName);
template.setMessageConverter(messageConverter);
return template;
}
@Autowired
@Qualifier("priorityPOCQueue")
public void setPriorityPOCQueue(Queue priorityPOCQueue) {
this.priorityPOCQueue = priorityPOCQueue;
}
@Bean
public MessageRecoverer miTestMessageRecoverer(final AmqpTemplate priorityPOCMessageTemplate) {
return new MessageRecoverer() {
@Override
public void recover(org.springframework.amqp.core.Message msg, Throwable t) {
StringBuilder sb = new StringBuilder();
sb.append("Firing Test Recoverer: ").append(t.getClass().getName()).append(" Message Count: ")
.append(msg.getMessageProperties().getMessageCount())
.append(" ID: ").append(msg.getMessageProperties().getMessageId())
.append(" DeliveryTag: ").append(msg.getMessageProperties().getDeliveryTag())
.append(" Redilivered: ").append(msg.getMessageProperties().isRedelivered());
logger.debug(sb.toString());
PriorityMessage m = new PriorityMessage(5);
m.setId(randomGenerator.nextLong(10L, 1000000L));
priorityPOCMessageTemplate.convertAndSend(m , new SimulateErrorHeaderPostProcessor(Boolean.FALSE, m.getPriority()));
}
};
}
@Bean
public RepublishMessageRecoverer miRepublishRecoverer(final AmqpTemplate priorityPOCMessageTemplate) {
class MiRecoverer extends RepublishMessageRecoverer {
public MiRecoverer(AmqpTemplate errorTemplate) {
super(errorTemplate);
this.setErrorRoutingKeyPrefix("");
}
@Override
protected Map<? extends String, ? extends Object> additionalHeaders(
org.springframework.amqp.core.Message message, Throwable cause) {
Map<String, Object> map = new HashMap<>();
if (message.getMessageProperties().getHeaders().containsKey("jmattempts") == false) {
map.put("jmattempts", 0);
} else {
Integer count = Integer.valueOf(message.getMessageProperties().getHeaders().get("jmattempts").toString());
map.put("jmattempts", ++count);
}
return map;
}
} ;
return new MiRecoverer(priorityPOCMessageTemplate);
}
@Bean
public StatefulRetryOperationsInterceptor inadequateResourceInterceptor(@Qualifier("priorityPOCMessageTemplate") AmqpTemplate priorityPOCMessageTemplate
, @Qualifier("priorityMessageKeyGenerator") PriorityMessageKeyGenerator priorityMessageKeyGenerator
, @Qualifier("miTestMessageRecoverer") MessageRecoverer messageRecoverer
, @Qualifier("miRepublishRecoverer") RepublishMessageRecoverer miRepublishRecoverer) {
StatefulRetryInterceptorBuilder b = RetryInterceptorBuilder.stateful();
return b.maxAttempts(2)
.backOffOptions(2000L, 1.0D, 4000L)
.messageKeyGenerator(priorityMessageKeyGenerator)
.recoverer(miRepublishRecoverer)
.build();
}
@Bean(name="exec.priorityPOC")
TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor e = new ThreadPoolTaskExecutor();
e.setCorePoolSize(1);
e.setQueueCapacity(1);
return e;
}
/* @Bean(name="poc.priorityChannel")
public MessageChannel pocPriorityChannel() {
PriorityChannel c = new PriorityChannel(new PriorityComparator());
c.setComponentName("poc.priorityChannel");
c.setBeanName("poc.priorityChannel");
return c;
}
*/
@Bean(name="poc.inputChannel")
public MessageChannel pocPriorityChannel() {
DirectChannel c = new DirectChannel();
c.setComponentName("poc.inputChannel");
c.setBeanName("poc.inputChannel");
return c;
}
@Bean(name="poc.inboundChannelAdapter") //make this a unique name
public AmqpInboundChannelAdapter amqpInboundChannelAdapter(@Qualifier("exec.priorityPOC") TaskExecutor taskExecutor
, @Qualifier("errorChannel") MessageChannel pocErrorChannel
, @Qualifier("inadequateResourceInterceptor") StatefulRetryOperationsInterceptor inadequateResourceInterceptor) {
org.aopalliance.aop.Advice[] adviceChain = new org.aopalliance.aop.Advice[]{inadequateResourceInterceptor};
int concurrentConsumers = 1;
AmqpInboundChannelAdapter a = springIntegrationConfigHelper.createInboundChannelAdapter(taskExecutor
, pocPriorityChannel(), new Queue[]{priorityPOCQueue}, concurrentConsumers, adviceChain
, new PocErrorHandler());
a.setErrorChannel(pocErrorChannel);
return a;
}
@Transformer(inputChannel = "poc.inputChannel", outputChannel = "poc.procesPoc")
public Message<PriorityMessage> incrementAttempts(Message<PriorityMessage> msg) {
//I stopped using this in the POC.
return msg;
}
@ServiceActivator(inputChannel="poc.procesPoc")
public void procesPoc(@Header(SimulateErrorHeaderPostProcessor.ERROR_SIMULATE_HEADER_KEY) Boolean simulateError
, @Headers Map<String, Object> headerMap
, PriorityMessage priorityMessage) throws InterruptedException {
if (isFirstMessageReceived == false) {
//Thread.sleep(15000); //Cause a bit of a backup so we can see prioritizing in action.
isFirstMessageReceived = true;
}
Integer retryAttempts = 0;
if (headerMap.containsKey("jmattempts")) {
retryAttempts = Integer.valueOf(headerMap.get("jmattempts").toString());
}
logger.debug("Received message with priority: " + priorityMessage.getPriority() + ", simulateError: " + simulateError + ", Current attempts count is "
+ retryAttempts);
if (simulateError && retryAttempts < PriorityMessage.MAX_MESSAGE_RETRY_COUNT) {
logger.debug(" Simulating an error and re-queue'ng. Current attempt count is " + retryAttempts);
throw new AnalyzerNonAdequateResourceException();
} else if (simulateError && retryAttempts > PriorityMessage.MAX_MESSAGE_RETRY_COUNT) {
logger.debug(" Max attempt count exceeded");
}
}
/**************************************************************************************************
*
* Error Channel
*
**************************************************************************************************/
//Note that we want to override default Spring error channel, so the name of the bean must be errorChannel
@Bean(name="errorChannel")
public MessageChannel pocErrorChannel() {
DirectChannel c = new DirectChannel();
c.setComponentName("errorChannel");
c.setBeanName("errorChannel");
return c;
}
@ServiceActivator(inputChannel="errorChannel")
public void pocHandleError(Message<MessagingException> message) throws Throwable {
MessagingException me = message.getPayload();
logger.error("pocHandleError: error encountered: " + me.getCause().getClass().getName());
SortedMap<String, Object> sorted= new TreeMap<>();
sorted.putAll(me.getFailedMessage().getHeaders());
if (me.getCause() instanceof AnalyzerNonAdequateResourceException) {
logger.debug("Headers: " + sorted.toString());
//Let this message get requeued
throw me.getCause();
}
Message<?> failedMsg = me.getFailedMessage();
Object o = failedMsg.getPayload();
StringBuilder sb = new StringBuilder();
if (o != null) {
sb.append("AnalyzerErrorHandler: Failed Message Type: ")
.append(o.getClass().getCanonicalName()).append(". toString: ").append(o.toString());
logger.error(sb.toString());
}
//The first level sometimes brings back either MessagingHandlingException or
//MessagingTransformationException which may contain a subcause
Exception e = (Exception)me.getCause();
int i = 0;
sb.delete(0, sb.length());
sb.append("AnalyzerErrorHandler nested messages: ");
while (e != null && i++ < 10) {
sb.append(System.lineSeparator()).append(" ")
.append(e.getClass().getCanonicalName()).append(": ")
.append(e.getMessage());
}
if (i > 0) {
logger.error(sb.toString());
}
//Don't want a message to recycle
throw new AmqpRejectAndDontRequeueException(e);
}
/**
* This gets set on the ListenerContainer. The default handler on the listener
* container logs everything with full stack trace. We don't want to do that
* for our known resource exception
*/
public static class PocErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
Throwable cause = t.getCause();
if (cause != null) {
while (cause.getCause() != null) {
cause = cause.getCause();
}
} else {
cause = t;
}
if (cause instanceof AnalyzerNonAdequateResourceException) {
logger.info(AnalyzerNonAdequateResourceException.class.getName() + ": not enough resources to process the item.");
return;
}
else {
logger.error("POC Listener Exception", t);
}
}
}
SpringIntegrationHelper
protected ConnectionFactory connectionFactory;
protected MessageConverter messageConverter;
@Autowired
public void setConnectionFactory (ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Autowired
public void setMessageConverter(@Qualifier("jsonMessageConverter") MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
public AmqpInboundChannelAdapter createInboundChannelAdapter(TaskExecutor taskExecutor
, MessageChannel outputChannel, Queue[] queues, int concurrentConsumers
, org.aopalliance.aop.Advice[] adviceChain,
ErrorHandler errorHandler) {
SimpleMessageListenerContainer listenerContainer =
new SimpleMessageListenerContainer(connectionFactory);
//AUTO is default, but setting it anyhow.
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
listenerContainer.setAutoStartup(true);
listenerContainer.setConcurrentConsumers(concurrentConsumers);
listenerContainer.setMessageConverter(messageConverter);
listenerContainer.setQueues(queues);
//listenerContainer.setChannelTransacted(false);
listenerContainer.setErrorHandler(errorHandler);
listenerContainer.setPrefetchCount(1);
listenerContainer.setTaskExecutor(taskExecutor);
listenerContainer.setDefaultRequeueRejected(true);
if (adviceChain != null && adviceChain.length > 0) {
listenerContainer.setAdviceChain(adviceChain);
}
AmqpInboundChannelAdapter a = new AmqpInboundChannelAdapter(listenerContainer);
a.setMessageConverter(messageConverter);
a.setAutoStartup(true);
a.setHeaderMapper(MyAmqpHeaderMapper.createPassAllHeaders());
a.setOutputChannel(outputChannel);
return a;
}
不清楚为什么要在这种情况下使用 PriorityChannel
;为什么不使用 priority queue in RabbitMQ?这样,您就可以 运行 容器线程上的流程。
自己把队列放到队列后面也可以,但是有消息丢失的风险。
我有一个使用优先通道的 Spring 集成配置。当从该通道读取一个项目时,会在那个时间点检查本地资源,如果资源无法处理该项目,我想重新queue消息以便另一台机器接收它.最初,我错误地抛出了一个异常,认为会发生 requeue,但是
我考虑过在入站通道适配器之后放置一个过滤器,如果当时资源不可用则抛出异常,但在那个时候无法对资源进行准确评估,因为当时资源可用性当根据优先级选择消息时,时间确实与可用时间匹配。
我的下一个想法是在优先通道之后和服务激活器之前放置一个过滤器,并将当前资源无法处理的消息直接发送到 discard-channel,它被定义为一个出站通道适配器,它发送消息回到原来的queue。这种方法有缺陷吗?
编辑 20150917:
根据 Gary 的建议,我已转移到 RabbitMQ 3.5.x 以便获得 built-in 优先级 queue。我现在在跟踪尝试次数时遇到问题,因为我的原始消息似乎被放回了 queue,而不是我修改后的消息。我已更新代码块以反映当前设置。
编辑 20150922:
我正在更新此 post 以反映我创建的概念代码库的最终证明。无论如何,我都不是 Spring-Integration 专家,所以请记住这一点以及此测试代码尚未准备好生产的事实。我的初衷是在抛出特定异常时重新提交消息并重试一定次数。这可以使用 StatefulRetryOperationsInterceptor
来完成。但是为了进一步试验,我希望能够在失败时 set/increment a header 然后在我的流程中有一些东西可以对该值做出反应。这是通过使用覆盖 additionalHeaders()
的 RepublishMessageRecoverer
的扩展来实现的。这个 object 然后用于配置 RetryOperationsInterceptor
。
另一件小事:当我的信号异常被抛出时,我想减少一些默认的 Spring 集成日志记录,所以我需要确保我按顺序命名我的错误通道 "errorChannel"替换 Spring 集成默认值。我还需要创建一个自定义 ErrorHandler
分配给 ListenerContainer
默认值,它将所有内容记录到 ERROR 级别。
这是我当前的设置:
Spring 集成 4.2.0.RELEASE
Spring AMQP 1.5.0.RELEASE
RabbitMQ 3.5.x
配置
@Autowired
public void setSpringIntegrationConfigHelper (SpringIntegrationHelper springIntegrationConfigHelper) {
this.springIntegrationConfigHelper = springIntegrationConfigHelper;
}
@Bean
public String priorityPOCQueueName() {
return "poc.priority";
}
@Bean
public Queue priorityPOCQueue(RabbitAdmin rabbitAdmin) {
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
//Adding the x-max-priority argument is what signals RabbitMQ that this is a priority queue. Must be Rabbit 3.5.x
Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-max-priority", 5);
Queue queue = new Queue(priorityPOCQueueName(),
durable,
exclusive,
autoDelete,
arguments);
rabbitAdmin.declareQueue(queue);
return queue;
}
@Bean
public Binding priorityPOCQueueBinding(RabbitAdmin rabbitAdmin) {
Binding binding = new Binding(priorityPOCQueueName(),
DestinationType.QUEUE,
"amq.direct",
priorityPOCQueue(rabbitAdmin).getName(),
null);
rabbitAdmin.declareBinding(binding);
return binding;
}
@Bean
public AmqpTemplate priorityPOCMessageTemplate(ConnectionFactory amqpConnectionFactory,
@Qualifier("priorityPOCQueueName") String queueName,
@Qualifier("jsonMessageConverter") MessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(amqpConnectionFactory);
template.setChannelTransacted(false);
template.setExchange("amq.direct");
template.setQueue(queueName);
template.setRoutingKey(queueName);
template.setMessageConverter(messageConverter);
return template;
}
@Autowired
@Qualifier("priorityPOCQueue")
public void setPriorityPOCQueue(Queue priorityPOCQueue) {
this.priorityPOCQueue = priorityPOCQueue;
}
@Bean
public MessageRecoverer miTestMessageRecoverer(final AmqpTemplate priorityPOCMessageTemplate) {
return new MessageRecoverer() {
@Override
public void recover(org.springframework.amqp.core.Message msg, Throwable t) {
StringBuilder sb = new StringBuilder();
sb.append("Firing Test Recoverer: ").append(t.getClass().getName()).append(" Message Count: ")
.append(msg.getMessageProperties().getMessageCount())
.append(" ID: ").append(msg.getMessageProperties().getMessageId())
.append(" DeliveryTag: ").append(msg.getMessageProperties().getDeliveryTag())
.append(" Redilivered: ").append(msg.getMessageProperties().isRedelivered());
logger.debug(sb.toString());
PriorityMessage m = new PriorityMessage(5);
m.setId(randomGenerator.nextLong(10L, 1000000L));
priorityPOCMessageTemplate.convertAndSend(m , new SimulateErrorHeaderPostProcessor(Boolean.FALSE, m.getPriority()));
}
};
}
@Bean
public RepublishMessageRecoverer miRepublishRecoverer(final AmqpTemplate priorityPOCMessageTemplate) {
class MiRecoverer extends RepublishMessageRecoverer {
public MiRecoverer(AmqpTemplate errorTemplate) {
super(errorTemplate);
this.setErrorRoutingKeyPrefix("");
}
@Override
protected Map<? extends String, ? extends Object> additionalHeaders(
org.springframework.amqp.core.Message message, Throwable cause) {
Map<String, Object> map = new HashMap<>();
if (message.getMessageProperties().getHeaders().containsKey("jmattempts") == false) {
map.put("jmattempts", 0);
} else {
Integer count = Integer.valueOf(message.getMessageProperties().getHeaders().get("jmattempts").toString());
map.put("jmattempts", ++count);
}
return map;
}
} ;
return new MiRecoverer(priorityPOCMessageTemplate);
}
@Bean
public StatefulRetryOperationsInterceptor inadequateResourceInterceptor(@Qualifier("priorityPOCMessageTemplate") AmqpTemplate priorityPOCMessageTemplate
, @Qualifier("priorityMessageKeyGenerator") PriorityMessageKeyGenerator priorityMessageKeyGenerator
, @Qualifier("miTestMessageRecoverer") MessageRecoverer messageRecoverer
, @Qualifier("miRepublishRecoverer") RepublishMessageRecoverer miRepublishRecoverer) {
StatefulRetryInterceptorBuilder b = RetryInterceptorBuilder.stateful();
return b.maxAttempts(2)
.backOffOptions(2000L, 1.0D, 4000L)
.messageKeyGenerator(priorityMessageKeyGenerator)
.recoverer(miRepublishRecoverer)
.build();
}
@Bean(name="exec.priorityPOC")
TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor e = new ThreadPoolTaskExecutor();
e.setCorePoolSize(1);
e.setQueueCapacity(1);
return e;
}
/* @Bean(name="poc.priorityChannel")
public MessageChannel pocPriorityChannel() {
PriorityChannel c = new PriorityChannel(new PriorityComparator());
c.setComponentName("poc.priorityChannel");
c.setBeanName("poc.priorityChannel");
return c;
}
*/
@Bean(name="poc.inputChannel")
public MessageChannel pocPriorityChannel() {
DirectChannel c = new DirectChannel();
c.setComponentName("poc.inputChannel");
c.setBeanName("poc.inputChannel");
return c;
}
@Bean(name="poc.inboundChannelAdapter") //make this a unique name
public AmqpInboundChannelAdapter amqpInboundChannelAdapter(@Qualifier("exec.priorityPOC") TaskExecutor taskExecutor
, @Qualifier("errorChannel") MessageChannel pocErrorChannel
, @Qualifier("inadequateResourceInterceptor") StatefulRetryOperationsInterceptor inadequateResourceInterceptor) {
org.aopalliance.aop.Advice[] adviceChain = new org.aopalliance.aop.Advice[]{inadequateResourceInterceptor};
int concurrentConsumers = 1;
AmqpInboundChannelAdapter a = springIntegrationConfigHelper.createInboundChannelAdapter(taskExecutor
, pocPriorityChannel(), new Queue[]{priorityPOCQueue}, concurrentConsumers, adviceChain
, new PocErrorHandler());
a.setErrorChannel(pocErrorChannel);
return a;
}
@Transformer(inputChannel = "poc.inputChannel", outputChannel = "poc.procesPoc")
public Message<PriorityMessage> incrementAttempts(Message<PriorityMessage> msg) {
//I stopped using this in the POC.
return msg;
}
@ServiceActivator(inputChannel="poc.procesPoc")
public void procesPoc(@Header(SimulateErrorHeaderPostProcessor.ERROR_SIMULATE_HEADER_KEY) Boolean simulateError
, @Headers Map<String, Object> headerMap
, PriorityMessage priorityMessage) throws InterruptedException {
if (isFirstMessageReceived == false) {
//Thread.sleep(15000); //Cause a bit of a backup so we can see prioritizing in action.
isFirstMessageReceived = true;
}
Integer retryAttempts = 0;
if (headerMap.containsKey("jmattempts")) {
retryAttempts = Integer.valueOf(headerMap.get("jmattempts").toString());
}
logger.debug("Received message with priority: " + priorityMessage.getPriority() + ", simulateError: " + simulateError + ", Current attempts count is "
+ retryAttempts);
if (simulateError && retryAttempts < PriorityMessage.MAX_MESSAGE_RETRY_COUNT) {
logger.debug(" Simulating an error and re-queue'ng. Current attempt count is " + retryAttempts);
throw new AnalyzerNonAdequateResourceException();
} else if (simulateError && retryAttempts > PriorityMessage.MAX_MESSAGE_RETRY_COUNT) {
logger.debug(" Max attempt count exceeded");
}
}
/**************************************************************************************************
*
* Error Channel
*
**************************************************************************************************/
//Note that we want to override default Spring error channel, so the name of the bean must be errorChannel
@Bean(name="errorChannel")
public MessageChannel pocErrorChannel() {
DirectChannel c = new DirectChannel();
c.setComponentName("errorChannel");
c.setBeanName("errorChannel");
return c;
}
@ServiceActivator(inputChannel="errorChannel")
public void pocHandleError(Message<MessagingException> message) throws Throwable {
MessagingException me = message.getPayload();
logger.error("pocHandleError: error encountered: " + me.getCause().getClass().getName());
SortedMap<String, Object> sorted= new TreeMap<>();
sorted.putAll(me.getFailedMessage().getHeaders());
if (me.getCause() instanceof AnalyzerNonAdequateResourceException) {
logger.debug("Headers: " + sorted.toString());
//Let this message get requeued
throw me.getCause();
}
Message<?> failedMsg = me.getFailedMessage();
Object o = failedMsg.getPayload();
StringBuilder sb = new StringBuilder();
if (o != null) {
sb.append("AnalyzerErrorHandler: Failed Message Type: ")
.append(o.getClass().getCanonicalName()).append(". toString: ").append(o.toString());
logger.error(sb.toString());
}
//The first level sometimes brings back either MessagingHandlingException or
//MessagingTransformationException which may contain a subcause
Exception e = (Exception)me.getCause();
int i = 0;
sb.delete(0, sb.length());
sb.append("AnalyzerErrorHandler nested messages: ");
while (e != null && i++ < 10) {
sb.append(System.lineSeparator()).append(" ")
.append(e.getClass().getCanonicalName()).append(": ")
.append(e.getMessage());
}
if (i > 0) {
logger.error(sb.toString());
}
//Don't want a message to recycle
throw new AmqpRejectAndDontRequeueException(e);
}
/**
* This gets set on the ListenerContainer. The default handler on the listener
* container logs everything with full stack trace. We don't want to do that
* for our known resource exception
*/
public static class PocErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
Throwable cause = t.getCause();
if (cause != null) {
while (cause.getCause() != null) {
cause = cause.getCause();
}
} else {
cause = t;
}
if (cause instanceof AnalyzerNonAdequateResourceException) {
logger.info(AnalyzerNonAdequateResourceException.class.getName() + ": not enough resources to process the item.");
return;
}
else {
logger.error("POC Listener Exception", t);
}
}
}
SpringIntegrationHelper
protected ConnectionFactory connectionFactory;
protected MessageConverter messageConverter;
@Autowired
public void setConnectionFactory (ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Autowired
public void setMessageConverter(@Qualifier("jsonMessageConverter") MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
public AmqpInboundChannelAdapter createInboundChannelAdapter(TaskExecutor taskExecutor
, MessageChannel outputChannel, Queue[] queues, int concurrentConsumers
, org.aopalliance.aop.Advice[] adviceChain,
ErrorHandler errorHandler) {
SimpleMessageListenerContainer listenerContainer =
new SimpleMessageListenerContainer(connectionFactory);
//AUTO is default, but setting it anyhow.
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
listenerContainer.setAutoStartup(true);
listenerContainer.setConcurrentConsumers(concurrentConsumers);
listenerContainer.setMessageConverter(messageConverter);
listenerContainer.setQueues(queues);
//listenerContainer.setChannelTransacted(false);
listenerContainer.setErrorHandler(errorHandler);
listenerContainer.setPrefetchCount(1);
listenerContainer.setTaskExecutor(taskExecutor);
listenerContainer.setDefaultRequeueRejected(true);
if (adviceChain != null && adviceChain.length > 0) {
listenerContainer.setAdviceChain(adviceChain);
}
AmqpInboundChannelAdapter a = new AmqpInboundChannelAdapter(listenerContainer);
a.setMessageConverter(messageConverter);
a.setAutoStartup(true);
a.setHeaderMapper(MyAmqpHeaderMapper.createPassAllHeaders());
a.setOutputChannel(outputChannel);
return a;
}
不清楚为什么要在这种情况下使用 PriorityChannel
;为什么不使用 priority queue in RabbitMQ?这样,您就可以 运行 容器线程上的流程。
自己把队列放到队列后面也可以,但是有消息丢失的风险。