如何在 Spring Integration Outbound Adapter 中为 Artemis Broker 添加 JmsMessage 延迟
How to add a delay to JmsMessage for Artemis Broker in Spring Integration Outbound Adapter
我有一个 jms 出站通道适配器。我想向我的 Apache Artemis Broker 发送消息并添加延迟。
<int-jms:outbound-channel-adapter connection-factory="scheduledConnectionFactory" channel="tnpScheduledOutboundChannel" destination="tnpScheduledQueue" />
我的 java 将 header 添加到 spring 集成消息的代码是:
return MessageBuilder.withPayload(sdpInfo).setHeader("_AMQ_SCHED_DELIVERY",sdpInfo.getDelay()).build();
header 添加为 spring 集成 header。但它没有被 JMS 接收,即 queue 的消费者会立即看到它。
我需要添加某种 JmsHeaderMapper 吗?有人可以指出一些文档或示例吗?
更新
这是在 JmsSendingMessageHandler 中调试时捕获的消息。
ActiveMQMessage[null]:PERSISTENT/ClientMessageImpl[messageID=0, durable=true, address=null,userID=null,properties=TypedProperties[sequenceNumber=4,file_status=0,sequenceSize=0,_AMQ_SCHED_DELIVERY=14685858,timestamp=1605707114145,correlationId=4d6fa6c8-fdcb-d69b-cd60-d260264545f5]]
如果我直接在激活器中使用一个jmsTemplate
public void sendDelayMessage(SdpInfoTemplate<?> message, long deliveryDelay, String queue) {
jmsTemplateDelay.setDeliveryDelay(deliveryDelay);
jmsTemplateDelay.setDeliveryMode(DeliveryMode.PERSISTENT);
jmsTemplateDelay.convertAndSend("sdp-file-tnp-scheduled-dimitris-test", message, m -> {
m.setLongProperty("_AMQ_SCHED_DELIVERY", deliveryDelay);
m.setStringProperty("TRANSACTION_ID",message.getTransactionId());
return m;
});
关键部分是jmsTemplateDelay.setDeliveryDelay(deliveryDelay);
如果我不设置它会被调度它被忽略并且消费者端立即接收消息
DefaultJmsHeaderMapper
默认用于JmsSendingMessageHandler
。如果值在受支持的类型中,它将所有 header 映射到 jmsMessage.setObjectProperty(propertyName, value);
:
private static final List<Class<?>> SUPPORTED_PROPERTY_TYPES = Arrays.asList(new Class<?>[] {
Boolean.class, Byte.class, Double.class, Float.class
或者你失去了你的 header 介于两者之间...
更新
抱歉,您让我们感到困惑。您的 _AMQ_SCHED_DELIVERY
需要作为 属性 出现在最终的 JMS 消息中。当你谈论 setDeliveryDelay()
时,这是完全不同的故事。
您可以做的是 DynamicJmsTemplate
的扩展。覆盖它的 getDeliveryDelay()
并从您填充的 ThreadLocal
中获取所需的值,然后再将消息发送到 <int-jms:outbound-channel-adapter>
.
这是我们绝对可以添加到 DynamicJmsTemplate
和 DynamicJmsTemplateProperties
中的内容,由提到的 JmsSendingMessageHandler
解决。但是目前,解决方法仅为此 deliveryDelay
选项实现您自己的 ThreadLocal
变量。
我有一个 jms 出站通道适配器。我想向我的 Apache Artemis Broker 发送消息并添加延迟。
<int-jms:outbound-channel-adapter connection-factory="scheduledConnectionFactory" channel="tnpScheduledOutboundChannel" destination="tnpScheduledQueue" />
我的 java 将 header 添加到 spring 集成消息的代码是:
return MessageBuilder.withPayload(sdpInfo).setHeader("_AMQ_SCHED_DELIVERY",sdpInfo.getDelay()).build();
header 添加为 spring 集成 header。但它没有被 JMS 接收,即 queue 的消费者会立即看到它。
我需要添加某种 JmsHeaderMapper 吗?有人可以指出一些文档或示例吗?
更新
这是在 JmsSendingMessageHandler 中调试时捕获的消息。
ActiveMQMessage[null]:PERSISTENT/ClientMessageImpl[messageID=0, durable=true, address=null,userID=null,properties=TypedProperties[sequenceNumber=4,file_status=0,sequenceSize=0,_AMQ_SCHED_DELIVERY=14685858,timestamp=1605707114145,correlationId=4d6fa6c8-fdcb-d69b-cd60-d260264545f5]]
如果我直接在激活器中使用一个jmsTemplate
public void sendDelayMessage(SdpInfoTemplate<?> message, long deliveryDelay, String queue) {
jmsTemplateDelay.setDeliveryDelay(deliveryDelay);
jmsTemplateDelay.setDeliveryMode(DeliveryMode.PERSISTENT);
jmsTemplateDelay.convertAndSend("sdp-file-tnp-scheduled-dimitris-test", message, m -> {
m.setLongProperty("_AMQ_SCHED_DELIVERY", deliveryDelay);
m.setStringProperty("TRANSACTION_ID",message.getTransactionId());
return m;
});
关键部分是jmsTemplateDelay.setDeliveryDelay(deliveryDelay);
如果我不设置它会被调度它被忽略并且消费者端立即接收消息
DefaultJmsHeaderMapper
默认用于JmsSendingMessageHandler
。如果值在受支持的类型中,它将所有 header 映射到 jmsMessage.setObjectProperty(propertyName, value);
:
private static final List<Class<?>> SUPPORTED_PROPERTY_TYPES = Arrays.asList(new Class<?>[] {
Boolean.class, Byte.class, Double.class, Float.class
或者你失去了你的 header 介于两者之间...
更新
抱歉,您让我们感到困惑。您的 _AMQ_SCHED_DELIVERY
需要作为 属性 出现在最终的 JMS 消息中。当你谈论 setDeliveryDelay()
时,这是完全不同的故事。
您可以做的是 DynamicJmsTemplate
的扩展。覆盖它的 getDeliveryDelay()
并从您填充的 ThreadLocal
中获取所需的值,然后再将消息发送到 <int-jms:outbound-channel-adapter>
.
这是我们绝对可以添加到 DynamicJmsTemplate
和 DynamicJmsTemplateProperties
中的内容,由提到的 JmsSendingMessageHandler
解决。但是目前,解决方法仅为此 deliveryDelay
选项实现您自己的 ThreadLocal
变量。