Spring 检测连接丢失和自动重新连接的集成 tcp-outbound-channel-adapter

Spring Integration tcp-outbound-channel-adapter that detects connection loss and auto-reconnects

我正在尝试做与这里完成的完全相反的事情:

How do I create a tcp-inbound-gateway which detects connection loss and auto-reconnects?

我有一组协作客户端 tcp 适配器取自示例应用程序。通常,应用程序会通过共享连接向第三方发送大量客户端请求,但有时根本没有请求通过。

因此,第 3 方要求我每 2 分钟向他们发送一次 ping 消息。如果在预定义的时间段内没有收到对 ping 的响应(这可能比其他调用的超时时间短,我应该终止连接并重新连接。

我的第一个想法是创建一个计划任务,每 2 分钟发送一次 ping,但我不确定如何终止连接并从任务内部重新建立连接。我正在考虑的另一种选择是使用连接拦截器并为请求和响应计时,但这似乎不对。我是 SI 的新手,所以任何朝着正确方向的推动都会有所帮助。

我刚才的另一个是将 tcp-outbound-channel-adapter 自动装配到作业中并调用 retryConnection()

<converter>
    <beans:bean class="org.springframework.integration.samples.tcpclientserver.ByteArrayToStringConverter" />
</converter>

<!-- Given we are looking for performance, let's use
     the most performant wire protocol. -->

<beans:bean id="fastestWireFormatSerializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer">
    <beans:constructor-arg value="1" />
</beans:bean>

<!-- Client side -->

<gateway id="gw"
    service-interface="org.springframework.integration.samples.tcpclientserver.SimpleGateway"
    default-request-channel="input" />

<ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="${availableServerSocket}"
    single-use="false"
    serializer="fastestWireFormatSerializer"
    deserializer="fastestWireFormatSerializer"
    so-timeout="10000" />

<publish-subscribe-channel id="input" />

<ip:tcp-outbound-channel-adapter id="outAdapter.client"
    order="2"
    channel="input"
    connection-factory="client" /> <!-- Collaborator -->

<!-- Also send a copy to the custom aggregator for correlation and
     so this message's replyChannel will be transferred to the
     aggregated message.
     The order ensures this gets to the aggregator first -->
<bridge input-channel="input" output-channel="toAggregator.client"
        order="1"/>

<!-- Asynch receive reply -->
<ip:tcp-inbound-channel-adapter id="inAdapter.client"
    channel="toAggregator.client"
    connection-factory="client" /> <!-- Collaborator -->

<!-- dataType attribute invokes the conversion service, if necessary -->
<channel id="toAggregator.client" datatype="java.lang.String" />

<aggregator input-channel="toAggregator.client"
    output-channel="toTransformer.client"
    correlation-strategy-expression="payload.substring(0,3)"
    release-strategy-expression="size() == 2" />

<transformer input-channel="toTransformer.client"
    expression="payload.get(1)"/> <!-- The response is always second -->

更新

我正在使用 SI 3.0。7.RELEASE

我在日志中看到的唯一异常是:

22:02:38.956 错误 [pool-1-thread-1][org.springframework.integration.ip.tcp.connection.TcpNetConnection] 读取异常 localhost:4607:39129:464bd042-dd9c-4639-8d1d-cdda61dc988a SocketTimeoutException:Read 超时

这是当 clientFactory 上的 so-timeout 设置为 10 秒时,我强制服务器休眠 15 秒。 SimpleGateway 调用不会返回任何内容。它只是坐在那里,永远等待。

使用上面列出的配置的示例代码:

String input = "ping";
                String result = null;
                System.out.println("Sending message: " + input);
                try{
                    result = gateway.send(input);
                }catch(Exception e){
                    System.out.println("There was an exception sending the message");
                }
                System.out.println("response: " + result);

输出:

发送消息:测试 22:12:17.093 错误 [pool-1-thread-1][org.springframework.integration.ip.tcp.connection.TcpNetConnection] 读取异常 localhost:4607:39232:fe90d394-edc3-440a-ab68-34e7162db6ec SocketTimeoutException:Read 超时

从不调用网关 returns。

有个new component in 4.2 called Thread Barrier.

这允许您等待一些(可配置的)时间,直到发生异步事件。

有关示例,请参阅 the barrier sample

ping 回复将 'release' 等待线程;在超时的情况下,您可以在处理程序上使用请求处理程序建议来捕获异常并采取您需要的操作(您可以从工厂获取打开的连接列表并通过 id 关闭它们)。

顺便说一句,我们可能会重写该示例以使用屏障而不是聚合器。 确实没有必要重新打开连接(除非您预计在新连接上会有未经请求的响应);它将在下次发送时打开。

编辑:

我需要查看更多您的日志;我刚刚试了一下,得到了以下信息:

2015-09-28 09:05:53,995 [pool-2-thread-1] ERROR: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Read exception localhost:5678:51323:f3537a39-feb4-4237-9508-7e67c1b79654
java.net.SocketTimeoutException: Read timed out
...
2015-09-28 09:05:53,996 [pool-2-thread-1] TRACE: org.springframework.integration.ip.tcp.TcpOutboundGateway - onMessage: localhost:5678:51323:f3537a39-feb4-4237-9508-7e67c1b79654([Payload=java.net.SocketTimeoutException: Read timed out][Headers={timestamp=1443445553996, id=0b3fae63-f431-c09c-ac4e-26ce4b012b6b, ip_connectionId=localhost:5678:51323:f3537a39-feb4-4237-9508-7e67c1b79654}])
2015-09-28 09:05:53,996 [main] DEBUG: org.springframework.integration.ip.tcp.TcpOutboundGateway - second chance
2015-09-28 09:05:55,998 [main] ERROR: org.springframework.integration.ip.tcp.TcpOutboundGateway - Tcp Gateway exception
org.springframework.integration.MessagingException: Exception while awaiting reply
...
Caused by: java.net.SocketTimeoutException: Read timed out
...

然而,它在 4.2(以及一次性插座)中被破坏了,所以我打开了一个 JIRA Issue。但是你的不是一次性的,所以我希望得到和我一样的结果。

编辑 2:

Skipping a socket timeout because we have a recent send localhost:4607:26489:926b1c1f-49cc-4f44-b20d-78f2e94e82d1

我早该提一下了。

因为连接工厂完全支持异步消息传递,我们可能必须等待 2 次超时才能关闭套接字并将异常传播到网关线程。

推理是这样的(使用你的 10 秒 so-timeout):

  1. 连接创建于 t+0;读取线程已启动并在套接字中被阻塞(不可中断)。
  2. 消息已发送至 t+9
  3. 读取在 t+10 处超时。
  4. 现在使用这个超时还为时过早,所以我们忽略它,因为在过去的 10 秒内有一个发送。
  5. t+20 再次读取超时。
  6. 异常传播给调用者。

平均而言,网关线程将在 so-timeout * 1.5 处获得超时,但它将在 so-timeoutso-timeout * 2 的范围内。

编辑 3:

我无法从日志中看出为什么它没有在您的案例中传播。

我刚刚将 this branch 上的 tcp-client-server 示例还原为 3.0.7,并将测试更改为强制超时(在跳过一个之后),这对我来说一切正常。

我们需要弄清楚你的情况和我的情况有什么不同。

您可以在 the last commit on that branch 中看到我对示例所做的更改。

如果你能想出一个类似的重现问题的测试用例,我可以帮你调试。