Spring 集成网关 VS 适配器
Spring Integration Gateway VS Adapters
您好,我是 spring 集成方面的新手
我检查了 Spring 集成动态路由的示例。我终于在这里找到了它
这里有行
@Component
@MessagingGateway(defaultRequestChannel = "toTcp.input")
public interface TcpClientGateway {
byte[] send(String data, @Header("host") String host, @Header("port") int port);
}
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
Assert.state(host != null && port != null, "host and/or port header missing");
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(cf);
IntegrationFlow flow = f -> f.handle(handler);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
但我用
改变了它
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
Assert.state(host != null && port != null, "host and/or port header missing");
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
cf.setLeaveOpen(true);
//cf.setSingleUse(true);
ByteArrayCrLfSerializer byteArrayCrLfSerializer =new ByteArrayCrLfSerializer();
byteArrayCrLfSerializer.setMaxMessageSize(1048576);
cf.setSerializer(byteArrayCrLfSerializer);
cf.setDeserializer(byteArrayCrLfSerializer);
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
tcpOutboundGateway.setConnectionFactory(cf);
IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
使用 request/response 架构。它确实工作得很好,因为它提供动态路由而无需手动创建 tcp 客户端。
此时我需要一些帮助来改进我的方案。我的场景是这样的;
客户端向服务器发送一条消息并从服务器接收该消息的响应,但随后服务器需要向该客户端发送任意消息(就像 GPS 位置更新信息)。当服务器开始向客户端发送这些消息时,会生成如下所示的错误消息
ERROR 54816 --- [pool-2-thread-1] o.s.i.ip.tcp.TcpOutboundGateway : Cannot correlate response - no pending reply for ::58628:62fd67b6-af2d-42f1-9c4d-d232fbe9c8ca
我检查了 spring 集成文档并注意到网关只能与 request/response 一起使用,所以我知道我应该使用适配器,但我不知道我应该如何将适配器与动态 tcp 客户端一起使用。
我在这里找到了类似的主题和一些回复,但无法达到我的目标或找到了组合解决方案的示例。
Spring integration TCP server push to client
您只需要注册两个流;一个用于输入;一个用于输出 - 问题是关联回复的响应,并将任意消息路由到网关以外的某个地方。
我更新了这个用例的示例 on this branch。
您可以看到该分支上最后一次提交的更改;大多数更改是为了模拟您的服务器端。
在客户端,我们简单地注册了两个流并使用@ServiceActivator
方法来获取入站消息;您可以通过连接 ID 识别它们来自哪个服务器。
您好,我是 spring 集成方面的新手
我检查了 Spring 集成动态路由的示例。我终于在这里找到了它
这里有行
@Component
@MessagingGateway(defaultRequestChannel = "toTcp.input")
public interface TcpClientGateway {
byte[] send(String data, @Header("host") String host, @Header("port") int port);
}
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
Assert.state(host != null && port != null, "host and/or port header missing");
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(cf);
IntegrationFlow flow = f -> f.handle(handler);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
但我用
改变了它private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
Assert.state(host != null && port != null, "host and/or port header missing");
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
cf.setLeaveOpen(true);
//cf.setSingleUse(true);
ByteArrayCrLfSerializer byteArrayCrLfSerializer =new ByteArrayCrLfSerializer();
byteArrayCrLfSerializer.setMaxMessageSize(1048576);
cf.setSerializer(byteArrayCrLfSerializer);
cf.setDeserializer(byteArrayCrLfSerializer);
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
tcpOutboundGateway.setConnectionFactory(cf);
IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
使用 request/response 架构。它确实工作得很好,因为它提供动态路由而无需手动创建 tcp 客户端。
此时我需要一些帮助来改进我的方案。我的场景是这样的;
客户端向服务器发送一条消息并从服务器接收该消息的响应,但随后服务器需要向该客户端发送任意消息(就像 GPS 位置更新信息)。当服务器开始向客户端发送这些消息时,会生成如下所示的错误消息
ERROR 54816 --- [pool-2-thread-1] o.s.i.ip.tcp.TcpOutboundGateway : Cannot correlate response - no pending reply for ::58628:62fd67b6-af2d-42f1-9c4d-d232fbe9c8ca
我检查了 spring 集成文档并注意到网关只能与 request/response 一起使用,所以我知道我应该使用适配器,但我不知道我应该如何将适配器与动态 tcp 客户端一起使用。
我在这里找到了类似的主题和一些回复,但无法达到我的目标或找到了组合解决方案的示例。
您只需要注册两个流;一个用于输入;一个用于输出 - 问题是关联回复的响应,并将任意消息路由到网关以外的某个地方。
我更新了这个用例的示例 on this branch。
您可以看到该分支上最后一次提交的更改;大多数更改是为了模拟您的服务器端。
在客户端,我们简单地注册了两个流并使用@ServiceActivator
方法来获取入站消息;您可以通过连接 ID 识别它们来自哪个服务器。