Spring Websphere 的集成聚合示例
Spring Integration Aggregation Example with Websphere
问题: 流输入仅适用于发送到输出通道聚合器的 1 个输入 'out'。后续消息仅进入丢弃通道 'logLateArrivers'。发送到丢弃通道的条件是什么?
描述: 尝试使用 WebSphere 使用聚合器为基本 jms 移植 Spring 集成示例。
更新:
- 打开调试显示轮询器正在工作。消息被拉取并放入 MQ,响应被提取。但是对于第一组消息之后的 MQ 场景,不会使用 AggregatingMessageHandler。消息被发送到丢弃通道上的 'logLateArrivers' 适配器与输出通道 'out'。我正在重新措辞问题陈述以使其更具体。
Spring 集成示例:
Spring Integration Github Example
使用 Spring 集成示例的输出:
test1
test2
[TEST1, TEST1]
[TEST2, TEST2]
使用 Spring 与 Websphere 集成的输出
test1
test2
[TEST1, TEST2]
[TEST1, TEST2]
使用 Websphere MQ 移植更改
common.xml
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="com.ibm.mq.jms.MQConnectionFactory">
<property name="channel" value="channelName" />
<property name="hostName" value="host1234" />
<property name="port" value="1111" />
<property name="queueManager" value="testqmgr" />
<property name="transportType" value="1" />
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
</bean>
<bean id="requestQueue" class="com.ibm.mq.jms.MQQueue">
<constructor-arg value="requestQueue"/>
</bean>
<bean id="requestTopic" class="com.ibm.mq.jms.MQTopic">
<constructor-arg value="topic.demo"/>
</bean>
<bean id="replyQueue" class="com.ibm.mq.jms.MQQueue">
<constructor-arg value="replyQueue"/>
</bean>
<!-- Poller that is the stream in channel for console input -->
<integration:poller id="poller" default="true" fixed-delay="1000"/>
Aggregation.xml
<int-stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/>
<int:channel id="stdinToJmsoutChannel"/>
<int:chain input-channel="stdinToJmsoutChannel">
<int:header-enricher>
<int:header name="jms_replyTo" ref="replyQueue" />
</int:header-enricher>
<int-jms:outbound-channel-adapter destination="requestTopic" />
</int:chain>
<int-jms:message-driven-channel-adapter channel="jmsReplyChannel"
destination="replyQueue"/>
<int:channel id="jmsReplyChannel" />
<int:aggregator input-channel="jmsReplyChannel" output-channel="out"
group-timeout="5000"
expire-groups-upon-timeout="false"
send-partial-result-on-expiry="true"
discard-channel="logLateArrivers"
correlation-strategy-expression="headers['jms_correlationId']"
release-strategy-expression="size() == 2"/>
<int:logging-channel-adapter id="logLateArrivers" />
<!-- Subscribers -->
<int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" />
<int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" />
<int:transformer input-channel="upcase" expression="payload.toUpperCase()" />
<!-- Profiles -->
<beans profile="default">
<int-stream:stdout-channel-adapter id="out" append-newline="true"/>
</beans>
<beans profile="testCase">
<int:bridge input-channel="out" output-channel="queueChannel"/>
<int:channel id="queueChannel">
<int:queue />
</int:channel>
</beans>
消息应在 jms_correlationId
上关联。打开 DEBUG 日志记录并比较示例和您的版本之间的消息流。可能相关 ID 设置不正确。
入站网关使用此逻辑...
replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
因此,与每个请求关联的消息在发送到聚合器时应该得到相同的jms_correlationId
。
您的测试表明这两封邮件不知何故具有相同的邮件 ID。
编辑
具有相同相关 ID(在本例中为 headers['jms_correlationId']
)的消息将被丢弃(迟到者),除非 expire-groups-upon-completion="true"
- 这允许一个新组开始而不是丢弃。您需要弄清楚为什么第二组与第一组具有相同的相关 ID。
问题: 流输入仅适用于发送到输出通道聚合器的 1 个输入 'out'。后续消息仅进入丢弃通道 'logLateArrivers'。发送到丢弃通道的条件是什么?
描述: 尝试使用 WebSphere 使用聚合器为基本 jms 移植 Spring 集成示例。
更新: - 打开调试显示轮询器正在工作。消息被拉取并放入 MQ,响应被提取。但是对于第一组消息之后的 MQ 场景,不会使用 AggregatingMessageHandler。消息被发送到丢弃通道上的 'logLateArrivers' 适配器与输出通道 'out'。我正在重新措辞问题陈述以使其更具体。
Spring 集成示例: Spring Integration Github Example
使用 Spring 集成示例的输出:
test1
test2
[TEST1, TEST1]
[TEST2, TEST2]
使用 Spring 与 Websphere 集成的输出
test1
test2
[TEST1, TEST2]
[TEST1, TEST2]
使用 Websphere MQ 移植更改
common.xml
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory"> <bean class="com.ibm.mq.jms.MQConnectionFactory"> <property name="channel" value="channelName" /> <property name="hostName" value="host1234" /> <property name="port" value="1111" /> <property name="queueManager" value="testqmgr" /> <property name="transportType" value="1" /> </bean> </property> <property name="sessionCacheSize" value="10"/> </bean> <bean id="requestQueue" class="com.ibm.mq.jms.MQQueue"> <constructor-arg value="requestQueue"/> </bean> <bean id="requestTopic" class="com.ibm.mq.jms.MQTopic"> <constructor-arg value="topic.demo"/> </bean> <bean id="replyQueue" class="com.ibm.mq.jms.MQQueue"> <constructor-arg value="replyQueue"/> </bean> <!-- Poller that is the stream in channel for console input --> <integration:poller id="poller" default="true" fixed-delay="1000"/>
Aggregation.xml
<int-stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/> <int:channel id="stdinToJmsoutChannel"/> <int:chain input-channel="stdinToJmsoutChannel"> <int:header-enricher> <int:header name="jms_replyTo" ref="replyQueue" /> </int:header-enricher> <int-jms:outbound-channel-adapter destination="requestTopic" /> </int:chain> <int-jms:message-driven-channel-adapter channel="jmsReplyChannel" destination="replyQueue"/> <int:channel id="jmsReplyChannel" /> <int:aggregator input-channel="jmsReplyChannel" output-channel="out" group-timeout="5000" expire-groups-upon-timeout="false" send-partial-result-on-expiry="true" discard-channel="logLateArrivers" correlation-strategy-expression="headers['jms_correlationId']" release-strategy-expression="size() == 2"/> <int:logging-channel-adapter id="logLateArrivers" /> <!-- Subscribers --> <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" /> <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" /> <int:transformer input-channel="upcase" expression="payload.toUpperCase()" /> <!-- Profiles --> <beans profile="default"> <int-stream:stdout-channel-adapter id="out" append-newline="true"/> </beans> <beans profile="testCase"> <int:bridge input-channel="out" output-channel="queueChannel"/> <int:channel id="queueChannel"> <int:queue /> </int:channel> </beans>
消息应在 jms_correlationId
上关联。打开 DEBUG 日志记录并比较示例和您的版本之间的消息流。可能相关 ID 设置不正确。
入站网关使用此逻辑...
replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
因此,与每个请求关联的消息在发送到聚合器时应该得到相同的jms_correlationId
。
您的测试表明这两封邮件不知何故具有相同的邮件 ID。
编辑
具有相同相关 ID(在本例中为 headers['jms_correlationId']
)的消息将被丢弃(迟到者),除非 expire-groups-upon-completion="true"
- 这允许一个新组开始而不是丢弃。您需要弄清楚为什么第二组与第一组具有相同的相关 ID。