Spring 作为一种类型发送的 AMQP RabbitMQ 对象在监听器中转换为映射
Spring AMQP RabbitMQ Object sent as one type gets converted to Map in Listener
在我的应用程序中,RabbitTemplate 将对象(EventMessage - 下面的代码)发送到队列。然而,在 RabbitListener 和 RabbitHandler 中,EventMessage 包含的 EmailMessage 对象在反序列化期间被转换为 LinkedHashmap
但是 MessageProperties 显示类型是 EventMessage
2020-09-23 18:39:47.712 WARN 16676 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{"type":101,"params":{"emailMessage":{"hasTo":true,"hasCc":false,"hasBcc":false,"template":null,"templateParams":null,"html":false,"toAddresses":["test@test.com"],"ccAddresses":null,"bccAddresses":null,"fromAddress":"test@mhserver.com","subject":"Test Subject","message":"An email has been sent to your registered email to reset your password","isHtml":false}}}' MessageProperties [headers={__TypeId__=in.teamnexus.mq.EventMessage}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=eventExchange, receivedRoutingKey=route.key.email, deliveryTag=1, consumerTag=amq.ctag-eCfxhUrcjiCF4lA7x8ZLNg, consumerQueue=queue.email])
Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class in.teamnexus.email.EmailMessage (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; in.teamnexus.email.EmailMessage is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @ac49aa4)
at in.teamnexus.mq.EmailQueueListener.handleEvent(EmailQueueListener.java:34) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
以下是 RabbitConfig
@Configuration
@EnableRabbit
@PropertySource("classpath:custom.properties")
public class RabbitConfig
{
@Bean
public ConnectionFactory rabbitConnectionFactory()
{
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitMQhost);
connectionFactory.setUsername(rabbitMQUsername);
connectionFactory.setPassword(rabbitMQPassword);
connectionFactory.setVirtualHost(rabbitMQVirtualHost);
connectionFactory.setPort(rabbitMQPort);
return connectionFactory;
}
@Bean
public MessageConverter messageConverter()
{
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(new ObjectMapper());
return messageConverter;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(messageConverter());
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConcurrentConsumers(20);
factory.setPrefetchCount(1);
factory.setMaxConcurrentConsumers(100);
factory.setAdviceChain(retryInterceptor());
return factory;
}
@Bean
public RetryOperationsInterceptor retryInterceptor()
{
return RetryInterceptorBuilder.stateless().maxAttempts(5).backOffOptions(1000, 2.0, 10000)
.recoverer(new RejectAndDontRequeueRecoverer()).build();
}
@Bean
public Queue mailQueue()
{
return new Queue("queue.email", true);
}
@Bean
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public AmqpAdmin amqpAdmin()
{
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory());
return rabbitAdmin;
}
@Bean
public Exchange exchange()
{
AmqpAdmin rabbitAdmin = amqpAdmin();
DirectExchange dirExchange = new DirectExchange("eventExchange", true, false);
rabbitAdmin.declareExchange(dirExchange);
rabbitAdmin.declareQueue(mailQueue());
Binding emailBinding = BindingBuilder.bind(mailQueue()).to(dirExchange).with(Constants.ROUTE_KEY_EMAIL);
rabbitAdmin.declareBinding(emailBinding);
rabbitAdmin.declareBinding(retryBinding);
return dirExchange;
}
@Bean
Publisher publisher()
{
PublisherImpl publisher = new PublisherImpl();
return publisher;
}
@Bean
EmailQueueListener emailQueueListener()
{
return new EmailQueueListener();
}
}
事件消息class
public class EventMessage<T> implements Serializable
{
public static final int TYPE_EMAIL = 101;
/**
*
*/
private static final long serialVersionUID = 1846120191276045453L;
@JsonProperty("type")
private int type;
@JsonProperty("params")
private Map<String, T> params;
public EventMessage()
{
}
public EventMessage(int type, Map<String, T> params)
{
this.type = type;
this.params = params;
}
// Getters Setters...
}
PublisherImpl class
public class PublisherImpl implements Publisher
{
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public <T> EventResponse publishMessage(EventMessage<T> message, boolean async)
{
EventResponse response = new EventResponse();
if (async)
{
int resp = doPublish(message);
if (resp == EventResponse.EVENT_SUCCESS)
{
response.setStatus(EventResponse.EVENT_SUCCESS);
response.setMessage("Event Successfully published to the queue");
}
else
{
response.setStatus(EventResponse.EVENT_FAILURE);
response.setMessage("Failed to publish Event to the queue");
}
}
else
{
doSyncOp(message)
}
return response;
}
private <T> int doPublish(EventMessage<T> message)
{
String routingKey = Constants.ROUTE_KEY_EVENT;
int retVal = EventResponse.EVENT_SUCCESS;
try
{
switch (message.getType())
{
case EventMessage.TYPE_EMAIL:
{
routingKey = Constants.ROUTE_KEY_EMAIL;
this.rabbitTemplate.convertAndSend("eventExchange", routingKey, message);
break;
}
}
}
catch (AmqpException e)
{
retVal = EventResponse.EVENT_FAILURE;
logger.debug("Unable to push to the queue", e);
}
return retVal;
}
// Getters/Setters
}
EmailQueueListener class
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "queue.email")
public class EmailQueueListener
{
Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private EmailSender emailSender;
@RabbitHandler
void handleEvent(EventMessage<EmailMessage> message)
{
logger.debug("### RabbitHandler Email: Receiving in listener:" + message);
Map<String, EmailMessage> params = message.getParams();
logger.debug("### emailMessage: " + params.get("emailMessage") + " class:" + params.get("emailMessage").getClass());
EmailMessage email = (EmailMessage) params.get("emailMessage");
emailSender.sendEmail(email);
}
// Getters/Setters
}
在调用 params.get("emailMessage") 的那一行是我得到异常和配置的重试次数的地方。我不确定我是否做错了什么。
编辑
这是发布消息的代码
public class EmailHelper
{
@Autowired
private Publisher publisher;
public void sendEmail(String to, String cc, String bcc, String subject, String text, boolean isHtml)
{
EmailMessage emailMessage = new EmailMessage();
emailMessage.setToAddresses(new String[] { to });
if (cc != null && !cc.isEmpty())
{
emailMessage.setCcAddresses(new String[] { cc });
}
if (bcc != null && !bcc.isEmpty())
{
emailMessage.setBccAddresses(new String[] { bcc });
}
emailMessage.setFromAddress("test@mhserver.com");
emailMessage.setSubject(subject);
emailMessage.setMessage(text);
Map<String, EmailMessage> params = new HashMap<>();
params.put("emailMessage", emailMessage);
EventMessage<EmailMessage> evtMsg = new EventMessage<>(EventMessage.TYPE_EMAIL, params);
publisher.<EmailMessage>publishMessage(evtMsg, true);
}
因为EventMessage
是泛型;消息转换器中的默认类型映射器无法处理任意泛型类型。
如果 @RabbitListener
定义在方法级别而不是 class 级别,它将起作用,因为我们可以从侦听器方法参数推断通用类型。
否则,您将需要为消息转换器创建自定义类型映射器。
在我的应用程序中,RabbitTemplate 将对象(EventMessage - 下面的代码)发送到队列。然而,在 RabbitListener 和 RabbitHandler 中,EventMessage 包含的 EmailMessage 对象在反序列化期间被转换为 LinkedHashmap
但是 MessageProperties 显示类型是 EventMessage
2020-09-23 18:39:47.712 WARN 16676 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{"type":101,"params":{"emailMessage":{"hasTo":true,"hasCc":false,"hasBcc":false,"template":null,"templateParams":null,"html":false,"toAddresses":["test@test.com"],"ccAddresses":null,"bccAddresses":null,"fromAddress":"test@mhserver.com","subject":"Test Subject","message":"An email has been sent to your registered email to reset your password","isHtml":false}}}' MessageProperties [headers={__TypeId__=in.teamnexus.mq.EventMessage}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=eventExchange, receivedRoutingKey=route.key.email, deliveryTag=1, consumerTag=amq.ctag-eCfxhUrcjiCF4lA7x8ZLNg, consumerQueue=queue.email])
Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class in.teamnexus.email.EmailMessage (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; in.teamnexus.email.EmailMessage is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @ac49aa4)
at in.teamnexus.mq.EmailQueueListener.handleEvent(EmailQueueListener.java:34) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
以下是 RabbitConfig
@Configuration
@EnableRabbit
@PropertySource("classpath:custom.properties")
public class RabbitConfig
{
@Bean
public ConnectionFactory rabbitConnectionFactory()
{
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitMQhost);
connectionFactory.setUsername(rabbitMQUsername);
connectionFactory.setPassword(rabbitMQPassword);
connectionFactory.setVirtualHost(rabbitMQVirtualHost);
connectionFactory.setPort(rabbitMQPort);
return connectionFactory;
}
@Bean
public MessageConverter messageConverter()
{
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(new ObjectMapper());
return messageConverter;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(messageConverter());
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConcurrentConsumers(20);
factory.setPrefetchCount(1);
factory.setMaxConcurrentConsumers(100);
factory.setAdviceChain(retryInterceptor());
return factory;
}
@Bean
public RetryOperationsInterceptor retryInterceptor()
{
return RetryInterceptorBuilder.stateless().maxAttempts(5).backOffOptions(1000, 2.0, 10000)
.recoverer(new RejectAndDontRequeueRecoverer()).build();
}
@Bean
public Queue mailQueue()
{
return new Queue("queue.email", true);
}
@Bean
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public AmqpAdmin amqpAdmin()
{
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory());
return rabbitAdmin;
}
@Bean
public Exchange exchange()
{
AmqpAdmin rabbitAdmin = amqpAdmin();
DirectExchange dirExchange = new DirectExchange("eventExchange", true, false);
rabbitAdmin.declareExchange(dirExchange);
rabbitAdmin.declareQueue(mailQueue());
Binding emailBinding = BindingBuilder.bind(mailQueue()).to(dirExchange).with(Constants.ROUTE_KEY_EMAIL);
rabbitAdmin.declareBinding(emailBinding);
rabbitAdmin.declareBinding(retryBinding);
return dirExchange;
}
@Bean
Publisher publisher()
{
PublisherImpl publisher = new PublisherImpl();
return publisher;
}
@Bean
EmailQueueListener emailQueueListener()
{
return new EmailQueueListener();
}
}
事件消息class
public class EventMessage<T> implements Serializable
{
public static final int TYPE_EMAIL = 101;
/**
*
*/
private static final long serialVersionUID = 1846120191276045453L;
@JsonProperty("type")
private int type;
@JsonProperty("params")
private Map<String, T> params;
public EventMessage()
{
}
public EventMessage(int type, Map<String, T> params)
{
this.type = type;
this.params = params;
}
// Getters Setters...
}
PublisherImpl class
public class PublisherImpl implements Publisher
{
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public <T> EventResponse publishMessage(EventMessage<T> message, boolean async)
{
EventResponse response = new EventResponse();
if (async)
{
int resp = doPublish(message);
if (resp == EventResponse.EVENT_SUCCESS)
{
response.setStatus(EventResponse.EVENT_SUCCESS);
response.setMessage("Event Successfully published to the queue");
}
else
{
response.setStatus(EventResponse.EVENT_FAILURE);
response.setMessage("Failed to publish Event to the queue");
}
}
else
{
doSyncOp(message)
}
return response;
}
private <T> int doPublish(EventMessage<T> message)
{
String routingKey = Constants.ROUTE_KEY_EVENT;
int retVal = EventResponse.EVENT_SUCCESS;
try
{
switch (message.getType())
{
case EventMessage.TYPE_EMAIL:
{
routingKey = Constants.ROUTE_KEY_EMAIL;
this.rabbitTemplate.convertAndSend("eventExchange", routingKey, message);
break;
}
}
}
catch (AmqpException e)
{
retVal = EventResponse.EVENT_FAILURE;
logger.debug("Unable to push to the queue", e);
}
return retVal;
}
// Getters/Setters
}
EmailQueueListener class
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "queue.email")
public class EmailQueueListener
{
Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private EmailSender emailSender;
@RabbitHandler
void handleEvent(EventMessage<EmailMessage> message)
{
logger.debug("### RabbitHandler Email: Receiving in listener:" + message);
Map<String, EmailMessage> params = message.getParams();
logger.debug("### emailMessage: " + params.get("emailMessage") + " class:" + params.get("emailMessage").getClass());
EmailMessage email = (EmailMessage) params.get("emailMessage");
emailSender.sendEmail(email);
}
// Getters/Setters
}
在调用 params.get("emailMessage") 的那一行是我得到异常和配置的重试次数的地方。我不确定我是否做错了什么。
编辑
这是发布消息的代码
public class EmailHelper
{
@Autowired
private Publisher publisher;
public void sendEmail(String to, String cc, String bcc, String subject, String text, boolean isHtml)
{
EmailMessage emailMessage = new EmailMessage();
emailMessage.setToAddresses(new String[] { to });
if (cc != null && !cc.isEmpty())
{
emailMessage.setCcAddresses(new String[] { cc });
}
if (bcc != null && !bcc.isEmpty())
{
emailMessage.setBccAddresses(new String[] { bcc });
}
emailMessage.setFromAddress("test@mhserver.com");
emailMessage.setSubject(subject);
emailMessage.setMessage(text);
Map<String, EmailMessage> params = new HashMap<>();
params.put("emailMessage", emailMessage);
EventMessage<EmailMessage> evtMsg = new EventMessage<>(EventMessage.TYPE_EMAIL, params);
publisher.<EmailMessage>publishMessage(evtMsg, true);
}
因为EventMessage
是泛型;消息转换器中的默认类型映射器无法处理任意泛型类型。
如果 @RabbitListener
定义在方法级别而不是 class 级别,它将起作用,因为我们可以从侦听器方法参数推断通用类型。
否则,您将需要为消息转换器创建自定义类型映射器。