Spring 作为一种类型发送的 AMQP RabbitMQ 对象在监听器中转换为映射

Spring AMQP RabbitMQ Object sent as one type gets converted to Map in Listener

在我的应用程序中,RabbitTemplate 将对象(EventMessage - 下面的代码)发送到队列。然而,在 RabbitListener 和 RabbitHandler 中,EventMessage 包含的 EmailMessage 对象在反序列化期间被转换为 LinkedHashmap

但是 MessageProperties 显示类型是 EventMessage

2020-09-23 18:39:47.712  WARN 16676 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'{"type":101,"params":{"emailMessage":{"hasTo":true,"hasCc":false,"hasBcc":false,"template":null,"templateParams":null,"html":false,"toAddresses":["test@test.com"],"ccAddresses":null,"bccAddresses":null,"fromAddress":"test@mhserver.com","subject":"Test Subject","message":"An email has been sent to your registered email to reset your password","isHtml":false}}}' MessageProperties [headers={__TypeId__=in.teamnexus.mq.EventMessage}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=eventExchange, receivedRoutingKey=route.key.email, deliveryTag=1, consumerTag=amq.ctag-eCfxhUrcjiCF4lA7x8ZLNg, consumerQueue=queue.email])

Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class in.teamnexus.email.EmailMessage (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; in.teamnexus.email.EmailMessage is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @ac49aa4)
    at in.teamnexus.mq.EmailQueueListener.handleEvent(EmailQueueListener.java:34) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na] 

以下是 RabbitConfig

@Configuration
@EnableRabbit
@PropertySource("classpath:custom.properties")
public class RabbitConfig
{

    @Bean
    public ConnectionFactory rabbitConnectionFactory()
    {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitMQhost);
        connectionFactory.setUsername(rabbitMQUsername);
        connectionFactory.setPassword(rabbitMQPassword);
        connectionFactory.setVirtualHost(rabbitMQVirtualHost);
        connectionFactory.setPort(rabbitMQPort);
        return connectionFactory;
    }

    @Bean
    public MessageConverter messageConverter()
    {
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(new ObjectMapper());
        return messageConverter;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setMessageConverter(messageConverter());
        factory.setConnectionFactory(rabbitConnectionFactory());
        factory.setConcurrentConsumers(20);
        factory.setPrefetchCount(1);
        factory.setMaxConcurrentConsumers(100);
        factory.setAdviceChain(retryInterceptor());
        return factory;
    }

    @Bean
    public RetryOperationsInterceptor retryInterceptor()
    {
        return RetryInterceptorBuilder.stateless().maxAttempts(5).backOffOptions(1000, 2.0, 10000)
                .recoverer(new RejectAndDontRequeueRecoverer()).build();
    }

    @Bean
    public Queue mailQueue()
    {
        return new Queue("queue.email", true);
    }

    @Bean
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin()
    {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory());
        return rabbitAdmin;
    }

    @Bean
    public Exchange exchange()
    {
        AmqpAdmin rabbitAdmin = amqpAdmin();
        DirectExchange dirExchange = new DirectExchange("eventExchange", true, false);

        rabbitAdmin.declareExchange(dirExchange);
        rabbitAdmin.declareQueue(mailQueue());
        Binding emailBinding = BindingBuilder.bind(mailQueue()).to(dirExchange).with(Constants.ROUTE_KEY_EMAIL);
        rabbitAdmin.declareBinding(emailBinding);
        rabbitAdmin.declareBinding(retryBinding);
        return dirExchange;
    }

    @Bean
    Publisher publisher()
    {
        PublisherImpl publisher = new PublisherImpl();
        return publisher;
    }

    @Bean
    EmailQueueListener emailQueueListener()
    {
        return new EmailQueueListener();
    }

}

事件消息class

public class EventMessage<T> implements Serializable
{
    public static final int TYPE_EMAIL = 101;

    /**
     * 
     */
    private static final long serialVersionUID = 1846120191276045453L;
    
    @JsonProperty("type")
    private int type;
    
    @JsonProperty("params")
    private Map<String, T> params;


    public EventMessage()
    {

    }

    public EventMessage(int type, Map<String, T> params)
    {
        this.type = type;
        this.params = params;
    }
    
    // Getters Setters...
}

PublisherImpl class

public class PublisherImpl implements Publisher
{
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public <T> EventResponse publishMessage(EventMessage<T> message, boolean async)
    {
        EventResponse response = new EventResponse();
        if (async)
        {
            int resp = doPublish(message);
            if (resp == EventResponse.EVENT_SUCCESS)
            {
                response.setStatus(EventResponse.EVENT_SUCCESS);
                response.setMessage("Event Successfully published to the queue");
            }
            else
            {
                response.setStatus(EventResponse.EVENT_FAILURE);
                response.setMessage("Failed to publish Event to the queue");
            }

        }
        else
        {
            doSyncOp(message)
        }
        return response;
    }

    private <T> int doPublish(EventMessage<T> message)
    {
        String routingKey = Constants.ROUTE_KEY_EVENT;
        int retVal = EventResponse.EVENT_SUCCESS;
        try
        {
            switch (message.getType())
            {
                case EventMessage.TYPE_EMAIL:
                {
                    routingKey = Constants.ROUTE_KEY_EMAIL;
                    this.rabbitTemplate.convertAndSend("eventExchange", routingKey, message);
                    break;
                }
            }
        }
        catch (AmqpException e)
        {
            retVal = EventResponse.EVENT_FAILURE;
            logger.debug("Unable to push to the queue", e);
        }
        return retVal;
    }

    // Getters/Setters
}

EmailQueueListener class

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "queue.email")
public class EmailQueueListener
{
    Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private EmailSender emailSender;

    @RabbitHandler
    void handleEvent(EventMessage<EmailMessage> message)
    {
        logger.debug("### RabbitHandler Email: Receiving in listener:" + message);
        Map<String, EmailMessage> params = message.getParams();
        logger.debug("### emailMessage: " + params.get("emailMessage") + " class:" + params.get("emailMessage").getClass());
        EmailMessage email = (EmailMessage) params.get("emailMessage");
        emailSender.sendEmail(email);
    }

    // Getters/Setters

}

在调用 params.get("emailMessage") 的那一行是我得到异常和配置的重试次数的地方。我不确定我是否做错了什么。

编辑

这是发布消息的代码

public class EmailHelper
{
    @Autowired
    private Publisher publisher;

    public void sendEmail(String to, String cc, String bcc, String subject, String text, boolean isHtml)
    {
        EmailMessage emailMessage = new EmailMessage();
        emailMessage.setToAddresses(new String[] { to });
        if (cc != null && !cc.isEmpty())
        {
            emailMessage.setCcAddresses(new String[] { cc });
        }
        if (bcc != null && !bcc.isEmpty())
        {
            emailMessage.setBccAddresses(new String[] { bcc });
        }
        emailMessage.setFromAddress("test@mhserver.com");
        emailMessage.setSubject(subject);
        emailMessage.setMessage(text);
        Map<String, EmailMessage> params = new HashMap<>();
        params.put("emailMessage", emailMessage);
        EventMessage<EmailMessage> evtMsg = new EventMessage<>(EventMessage.TYPE_EMAIL, params);
        publisher.<EmailMessage>publishMessage(evtMsg, true);
    }

因为EventMessage是泛型;消息转换器中的默认类型映射器无法处理任意泛型类型。

如果 @RabbitListener 定义在方法级别而不是 class 级别,它将起作用,因为我们可以从侦听器方法参数推断通用类型。

否则,您将需要为消息转换器创建自定义类型映射器。