Spring Websphere 的集成聚合示例

Spring Integration Aggregation Example with Websphere

问题: 流输入仅适用于发送到输出通道聚合器的 1 个输入 'out'。后续消息仅进入丢弃通道 'logLateArrivers'。发送到丢弃通道的条件是什么?

描述: 尝试使用 WebSphere 使用聚合器为基本 jms 移植 Spring 集成示例。

更新: - 打开调试显示轮询器正在工作。消息被拉取并放入 MQ,响应被提取。但是对于第一组消息之后的 MQ 场景,不会使用 AggregatingMessageHandler。消息被发送到丢弃通道上的 'logLateArrivers' 适配器与输出通道 'out'。我正在重新措辞问题陈述以使其更具体。

Spring 集成示例: Spring Integration Github Example

使用 Spring 集成示例的输出:

test1
test2
[TEST1, TEST1]
[TEST2, TEST2]

使用 Spring 与 Websphere 集成的输出

test1
test2
[TEST1, TEST2]
[TEST1, TEST2]

使用 Websphere MQ 移植更改

  1. common.xml

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory">
            <bean class="com.ibm.mq.jms.MQConnectionFactory">
                <property name="channel" value="channelName" />
                <property name="hostName" value="host1234" />
                <property name="port" value="1111" />
                <property name="queueManager" value="testqmgr" />
                <property name="transportType" value="1" /> 
            </bean>
        </property>
        <property name="sessionCacheSize" value="10"/>
    </bean>
    
    <bean id="requestQueue" class="com.ibm.mq.jms.MQQueue">
        <constructor-arg value="requestQueue"/>
    </bean>
    
    <bean id="requestTopic" class="com.ibm.mq.jms.MQTopic">
        <constructor-arg value="topic.demo"/>
    </bean>
    
    <bean id="replyQueue" class="com.ibm.mq.jms.MQQueue">
        <constructor-arg value="replyQueue"/>
    </bean>
    
    <!-- Poller that is the stream in channel for console input -->
    <integration:poller id="poller" default="true" fixed-delay="1000"/>
    

  2. Aggregation.xml

    <int-stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/>
    
    <int:channel id="stdinToJmsoutChannel"/>
    
    <int:chain input-channel="stdinToJmsoutChannel">
        <int:header-enricher>
            <int:header name="jms_replyTo" ref="replyQueue" />
        </int:header-enricher>
        <int-jms:outbound-channel-adapter destination="requestTopic" />
    </int:chain>
    
    <int-jms:message-driven-channel-adapter channel="jmsReplyChannel"
        destination="replyQueue"/>
    
    <int:channel id="jmsReplyChannel" />
    
    <int:aggregator input-channel="jmsReplyChannel" output-channel="out"
        group-timeout="5000"
        expire-groups-upon-timeout="false"
        send-partial-result-on-expiry="true"
        discard-channel="logLateArrivers"
        correlation-strategy-expression="headers['jms_correlationId']"
        release-strategy-expression="size() == 2"/>
    
    <int:logging-channel-adapter id="logLateArrivers" />
    
    <!-- Subscribers -->
    
    <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" />
    
    <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" />
    
    <int:transformer input-channel="upcase" expression="payload.toUpperCase()" />
    
    <!-- Profiles -->
    
    <beans profile="default">
    
        <int-stream:stdout-channel-adapter id="out" append-newline="true"/>
    
    </beans>
    
    <beans profile="testCase">
    
        <int:bridge input-channel="out" output-channel="queueChannel"/>
    
        <int:channel id="queueChannel">
            <int:queue />
        </int:channel>
    
    </beans>
    

消息应在 jms_correlationId 上关联。打开 DEBUG 日志记录并比较示例和您的版本之间的消息流。可能相关 ID 设置不正确。

入站网关使用此逻辑...

replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());

因此,与每个请求关联的消息在发送到聚合器时应该得到相同的jms_correlationId

您的测试表明这两封邮件不知何故具有相同的邮件 ID。

编辑

具有相同相关 ID(在本例中为 headers['jms_correlationId'])的消息将被丢弃(迟到者),除非 expire-groups-upon-completion="true" - 这允许一个新组开始而不是丢弃。您需要弄清楚为什么第二组与第一组具有相同的相关 ID。