spring 启动 rabbitMQ DLE 不接受任何消息
spring boot rabbitMQ DLE not accepting any messages
我正在研究 spring-boot rabbitMQ。我正在创建一个死信队列,我可以在 RabbitMQ 管理员中看到它 "D,DLE" 但没有 DLK 可能是我缺少设置 "x-dead-letter-routing-key",问题是我不想路由 key.Few 我的消费者绑定到一个特定的交换,我正在为每个交换创建 DLE,如果该交换的消费者有任何问题,那么 DLE 附加到该交换接收该消息并执行用户依赖 logic.But 不幸的是不工作,DLE 没有收到任何消息。
请找到下面的代码,
package com.sample.rabbit;
import org.slf4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Argument;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;
@SpringBootApplication
public class SampleRabbitApplication {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(SampleRabbitApplication.class, args);
context.getBean(SampleRabbitApplication.class).runDemo();
context.close();
}
@Autowired
private RabbitTemplate template;
private void runDemo() throws Exception {
this.template.convertAndSend("sample-queue", new Foo("bar"),m -> {
m.getMessageProperties().setHeader("__TypeId__","foo");
return m;
});
this.template.convertAndSend("sample-queue", new Foo("throw"),m -> {
m.getMessageProperties().setHeader("__TypeId__","foo");
return m;
});
this.template.convertAndSend("sample-queue", new Foo("bar"), m -> {
return new Message("some bad json".getBytes(), m.getMessageProperties());
});
Thread.sleep(5000);
}
@RabbitListener(
id = "sample-queue",
bindings = @QueueBinding(
value = @org.springframework.amqp.rabbit.annotation.Queue(value = "sample-queue", durable = "true"),
exchange = @org.springframework.amqp.rabbit.annotation.Exchange(value = "sample.exchange", durable = "true")
)
)
public void handle(Foo in) {
System.out.println("Received: " + in);
if("throw".equalsIgnoreCase(in.getFoo())){
throw new BadRequestException("Foo contains throw so it throwed the exception.");
}
}
@RabbitListener(
id = "sample-dead-letter-queue",
bindings = @QueueBinding(
value = @org.springframework.amqp.rabbit.annotation.Queue(value = "sample-dead-letter-queue", durable = "true", arguments = {@Argument(name = "x-dead-letter-exchange",value = "sample.exchange"),@Argument(name = "x-dead-letter-routing-key",value = "#")}),
exchange = @org.springframework.amqp.rabbit.annotation.Exchange(value = "critical.exchange", durable = "true",type = "topic")
)
)
public void handleDLE(Message in) {
System.out.println("Received in DLE: " + in.getBody());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonConverter());
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
}
@Bean
public MessageConverter jsonConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultClassMapper mapper = new DefaultClassMapper();
mapper.setDefaultType(Foo.class);
converter.setClassMapper(mapper);
return new Jackson2JsonMessageConverter();
}
public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private final Logger LOG = org.slf4j.LoggerFactory.getLogger(getClass());
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException && isCauseFatal(t.getCause())) {
//To do : Here we have to configure DLE(Critical queue) and put all the messages in the critical queue.
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
if(lefe.getFailedMessage() != null) {
LOG.info("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
} else {
LOG.info("Failed to process inbound message from queue "
+ lefe.getMessage(), t);
}
}
return super.isFatal(t);
}
private boolean isCauseFatal(Throwable cause) {
return cause instanceof MessageConversionException
|| cause instanceof org.springframework.messaging.converter.MessageConversionException
|| cause instanceof MethodArgumentNotValidException
|| cause instanceof MethodArgumentTypeMismatchException
|| cause instanceof NoSuchMethodException
|| cause instanceof ClassCastException
|| isUserCauseFatal(cause);
}
/**
* Subclasses can override this to add custom exceptions.
* @param cause the cause
* @return true if the cause is fatal.
*/
protected boolean isUserCauseFatal(Throwable cause) {
return true;
}
}
public static class Foo {
private String foo;
public Foo() {
super();
}
public Foo(String foo) {
this.foo = foo;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Foo [foo=" + this.foo + "]";
}
}
}
我的交换器和队列是直接的,我的每个消费者都将使用不同的路由密钥但它们属于同一个交换器,那么我如何编写一个有效消耗所有故障的 DLE messages.In 上面的代码示例一条消息是成功的,另一条消息是失败的,但我在 DLE 中看不到失败消息。
如有任何帮助,我们将不胜感激。
如果您使用死信交换 (DLX) 配置队列但没有死信路由键,则消息将使用原始路由键路由到 DLX。
处理您的用例的最简单方法是使 DLX 成为主题交换并使用路由键 #
(所有消息的通配符)将队列绑定到它,所有错误都将进入该队列。
如果你想将错误隔离到单独的队列中,然后使用原始路由键为每个队列绑定一个 DLQ。
编辑
正确的配置如下:
@RabbitListener(id = "sample-queue",
bindings = @QueueBinding(value = @Queue(value = "sample-queue", durable = "true", arguments =
@Argument(name = "x-dead-letter-exchange", value = "critical.exchange")),
exchange = @Exchange(value = "sample.exchange", durable = "true")))
public void handle(Foo in) {
System.out.println("Received: " + in);
}
@RabbitListener(id = "sample-dead-letter-queue", containerFactory = "noJsonContainerFactory",
bindings = @QueueBinding(value = @Queue(value = "sample-dead-letter-queue", durable = "true"),
exchange = @Exchange(value = "critical.exchange", durable = "true", type = "topic"),
key = "#"))
public void handleDLE(Message in) {
System.out.println("Received in DLE: " + new String(in.getBody()));
}
@Bean
public SimpleRabbitListenerContainerFactory noJsonContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(errorHandler());
return factory;
}
我正在研究 spring-boot rabbitMQ。我正在创建一个死信队列,我可以在 RabbitMQ 管理员中看到它 "D,DLE" 但没有 DLK 可能是我缺少设置 "x-dead-letter-routing-key",问题是我不想路由 key.Few 我的消费者绑定到一个特定的交换,我正在为每个交换创建 DLE,如果该交换的消费者有任何问题,那么 DLE 附加到该交换接收该消息并执行用户依赖 logic.But 不幸的是不工作,DLE 没有收到任何消息。
请找到下面的代码,
package com.sample.rabbit;
import org.slf4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Argument;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;
@SpringBootApplication
public class SampleRabbitApplication {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(SampleRabbitApplication.class, args);
context.getBean(SampleRabbitApplication.class).runDemo();
context.close();
}
@Autowired
private RabbitTemplate template;
private void runDemo() throws Exception {
this.template.convertAndSend("sample-queue", new Foo("bar"),m -> {
m.getMessageProperties().setHeader("__TypeId__","foo");
return m;
});
this.template.convertAndSend("sample-queue", new Foo("throw"),m -> {
m.getMessageProperties().setHeader("__TypeId__","foo");
return m;
});
this.template.convertAndSend("sample-queue", new Foo("bar"), m -> {
return new Message("some bad json".getBytes(), m.getMessageProperties());
});
Thread.sleep(5000);
}
@RabbitListener(
id = "sample-queue",
bindings = @QueueBinding(
value = @org.springframework.amqp.rabbit.annotation.Queue(value = "sample-queue", durable = "true"),
exchange = @org.springframework.amqp.rabbit.annotation.Exchange(value = "sample.exchange", durable = "true")
)
)
public void handle(Foo in) {
System.out.println("Received: " + in);
if("throw".equalsIgnoreCase(in.getFoo())){
throw new BadRequestException("Foo contains throw so it throwed the exception.");
}
}
@RabbitListener(
id = "sample-dead-letter-queue",
bindings = @QueueBinding(
value = @org.springframework.amqp.rabbit.annotation.Queue(value = "sample-dead-letter-queue", durable = "true", arguments = {@Argument(name = "x-dead-letter-exchange",value = "sample.exchange"),@Argument(name = "x-dead-letter-routing-key",value = "#")}),
exchange = @org.springframework.amqp.rabbit.annotation.Exchange(value = "critical.exchange", durable = "true",type = "topic")
)
)
public void handleDLE(Message in) {
System.out.println("Received in DLE: " + in.getBody());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonConverter());
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
}
@Bean
public MessageConverter jsonConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultClassMapper mapper = new DefaultClassMapper();
mapper.setDefaultType(Foo.class);
converter.setClassMapper(mapper);
return new Jackson2JsonMessageConverter();
}
public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private final Logger LOG = org.slf4j.LoggerFactory.getLogger(getClass());
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException && isCauseFatal(t.getCause())) {
//To do : Here we have to configure DLE(Critical queue) and put all the messages in the critical queue.
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
if(lefe.getFailedMessage() != null) {
LOG.info("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
} else {
LOG.info("Failed to process inbound message from queue "
+ lefe.getMessage(), t);
}
}
return super.isFatal(t);
}
private boolean isCauseFatal(Throwable cause) {
return cause instanceof MessageConversionException
|| cause instanceof org.springframework.messaging.converter.MessageConversionException
|| cause instanceof MethodArgumentNotValidException
|| cause instanceof MethodArgumentTypeMismatchException
|| cause instanceof NoSuchMethodException
|| cause instanceof ClassCastException
|| isUserCauseFatal(cause);
}
/**
* Subclasses can override this to add custom exceptions.
* @param cause the cause
* @return true if the cause is fatal.
*/
protected boolean isUserCauseFatal(Throwable cause) {
return true;
}
}
public static class Foo {
private String foo;
public Foo() {
super();
}
public Foo(String foo) {
this.foo = foo;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Foo [foo=" + this.foo + "]";
}
}
}
我的交换器和队列是直接的,我的每个消费者都将使用不同的路由密钥但它们属于同一个交换器,那么我如何编写一个有效消耗所有故障的 DLE messages.In 上面的代码示例一条消息是成功的,另一条消息是失败的,但我在 DLE 中看不到失败消息。
如有任何帮助,我们将不胜感激。
如果您使用死信交换 (DLX) 配置队列但没有死信路由键,则消息将使用原始路由键路由到 DLX。
处理您的用例的最简单方法是使 DLX 成为主题交换并使用路由键 #
(所有消息的通配符)将队列绑定到它,所有错误都将进入该队列。
如果你想将错误隔离到单独的队列中,然后使用原始路由键为每个队列绑定一个 DLQ。
编辑
正确的配置如下:
@RabbitListener(id = "sample-queue",
bindings = @QueueBinding(value = @Queue(value = "sample-queue", durable = "true", arguments =
@Argument(name = "x-dead-letter-exchange", value = "critical.exchange")),
exchange = @Exchange(value = "sample.exchange", durable = "true")))
public void handle(Foo in) {
System.out.println("Received: " + in);
}
@RabbitListener(id = "sample-dead-letter-queue", containerFactory = "noJsonContainerFactory",
bindings = @QueueBinding(value = @Queue(value = "sample-dead-letter-queue", durable = "true"),
exchange = @Exchange(value = "critical.exchange", durable = "true", type = "topic"),
key = "#"))
public void handleDLE(Message in) {
System.out.println("Received in DLE: " + new String(in.getBody()));
}
@Bean
public SimpleRabbitListenerContainerFactory noJsonContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(errorHandler());
return factory;
}