ChannelResolutionException:没有 output-channel 或 replyChannel header 可用 - 仅适用于许多请求

ChannelResolutionException: no output-channel or replyChannel header available - Only with many requests

我是 运行 Spring 集成 TCP 多路复用示例的客户端部分。我试图查看它一次可以处理多少个请求,大约 1000 个,我开始收到此错误:ChannelResolutionException: no output-channel or replyChannel header available

大约 1000 次调用以下一切正常。

<beans:description>
        Uses conversion service and collaborating channel adapters.
    </beans:description>

    <context:property-placeholder />

    <converter>
        <beans:bean class="org.springframework.integration.samples.tcpclientserver.ByteArrayToStringConverter" />
    </converter>

    <!-- Fastest Wire Protocol - takes a byte array with its length definied in the first x bytes-->
    <beans:bean id="fastestWireFormatSerializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer">
        <beans:constructor-arg value="1" />
    </beans:bean>

    <!-- Client side -->

    <gateway id="gw"
        service-interface="org.springframework.integration.samples.tcpclientserver.SimpleGateway"
        default-request-channel="input" />

    <ip:tcp-connection-factory id="client"
        type="client"
        host="localhost"
        port="${availableServerSocket}"
        single-use="false"
        serializer="fastestWireFormatSerializer"
        deserializer="fastestWireFormatSerializer"
        so-timeout="10000" />

    <publish-subscribe-channel id="input" />

    <!-- scheduler - Thread used to restablish connection so the other threads aren't starved while waiting to re-establish connection -->
    <!-- client-mode - Automatically re-establishes the connection if lost -->
    <ip:tcp-outbound-channel-adapter id="outAdapter.client"
        order="2"
        channel="input"
        client-mode="true"              
        connection-factory="client" />  <!-- Collaborator -->



    <!-- Also send a copy to the custom aggregator for correlation and
         so this message's replyChannel will be transferred to the
         aggregated message.
         The order ensures this gets to the aggregator first -->
    <bridge input-channel="input" output-channel="toAggregator.client"
            order="1"/>

    <!-- Asynch receive reply -->
    <ip:tcp-inbound-channel-adapter id="inAdapter.client"
        channel="toAggregator.client"
        connection-factory="client" /> <!-- Collaborator -->

    <!-- dataType attribute invokes the conversion service, if necessary -->
    <channel id="toAggregator.client" datatype="java.lang.String" />

    <aggregator input-channel="toAggregator.client"
        output-channel="toTransformer.client"
        correlation-strategy-expression="payload.substring(0,3)"
        release-strategy-expression="size() == 2"
        expire-groups-upon-completion="true" />

    <transformer input-channel="toTransformer.client"
        expression="payload.get(1)"/> <!-- The response is always second -->

    <task:scheduler id="reconnectScheduler" pool-size="10"/>

以及用于测试的代码:

        TaskExecutor executor = new SimpleAsyncTaskExecutor();
        final CountDownLatch latch = new CountDownLatch(100);
        final Set<Integer> results = new HashSet<Integer>();
        for (int i = 100; i < 1050; i++) {
            results.add(i);
            final int j = i;
            executor.execute(new Runnable() {
                public void run() {
                    String result = gateway.send(j + "Hello world!"); // first 3 bytes is correlationid
                    System.out.println("Test Result: " + result);
                    results.remove(j);
                    latch.countDown();
                }});
        }

我还没有完全弄明白为什么你会得到那个异常,但是你的测试有几个问题。

  1. 倒数锁存器需要在 950 处初始化
  2. 由于你超过了999,我们需要改变相关性:

    payload.substring(0,4)

经过这些更改,它对我有用。

等我有更多时间时,我会试着弄清楚为什么我们会遇到这个异常。

编辑

问题确实是由冲突的相关 ID 引起的。

最后 50 条消息都有关联 ID 100,这意味着消息以不确定的方式发布(假设发布是基于大小)。在某些情况下,会释放两个输入消息(导致对测试用例的错误回复)。当发布2条回复时;没有输出通道。