Spring 集成网关 VS 适配器

Spring Integration Gateway VS Adapters

您好,我是 spring 集成方面的新手

我检查了 Spring 集成动态路由的示例。我终于在这里找到了它

Dynamic TCP Client

这里有行

@Component
@MessagingGateway(defaultRequestChannel = "toTcp.input")
public interface TcpClientGateway {
    byte[] send(String data, @Header("host") String host, @Header("port") int port);
}

private MessageChannel createNewSubflow(Message<?> message) {
        String host = (String) message.getHeaders().get("host");
        Integer port = (Integer) message.getHeaders().get("port");
        Assert.state(host != null && port != null, "host and/or port header missing");
        String hostPort = host + port;

        TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
        TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
        handler.setConnectionFactory(cf);
        IntegrationFlow flow = f -> f.handle(handler);
        IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
                this.flowContext.registration(flow)
                        .addBean(cf)
                        .id(hostPort + ".flow")
                        .register();
        MessageChannel inputChannel = flowRegistration.getInputChannel();
        this.subFlows.put(hostPort, inputChannel);
        return inputChannel;
    }

但我用

改变了它
private MessageChannel createNewSubflow(Message<?> message) {
    String host = (String) message.getHeaders().get("host");
    Integer port = (Integer) message.getHeaders().get("port");
    Assert.state(host != null && port != null, "host and/or port header missing");
    String hostPort = host + port;

    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
    cf.setLeaveOpen(true);
    //cf.setSingleUse(true);

    ByteArrayCrLfSerializer  byteArrayCrLfSerializer =new ByteArrayCrLfSerializer();
    byteArrayCrLfSerializer.setMaxMessageSize(1048576);

    cf.setSerializer(byteArrayCrLfSerializer);
    cf.setDeserializer(byteArrayCrLfSerializer);

    TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
    tcpOutboundGateway.setConnectionFactory(cf);

    IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);

    IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
        this.flowContext.registration(flow)
            .addBean(cf)
            .id(hostPort + ".flow")
            .register();
    MessageChannel inputChannel = flowRegistration.getInputChannel();

    this.subFlows.put(hostPort, inputChannel);
    return inputChannel;
}

使用 request/response 架构。它确实工作得很好,因为它提供动态路由而无需手动创建 tcp 客户端。

此时我需要一些帮助来改进我的方案。我的场景是这样的;

客户端向服务器发送一条消息并从服务器接收该消息的响应,但随后服务器需要向该客户端发送任意消息(就像 GPS 位置更新信息)。当服务器开始向客户端发送这些消息时,会生成如下所示的错误消息

ERROR 54816 --- [pool-2-thread-1] o.s.i.ip.tcp.TcpOutboundGateway : Cannot correlate response - no pending reply for ::58628:62fd67b6-af2d-42f1-9c4d-d232fbe9c8ca

我检查了 spring 集成文档并注意到网关只能与 request/response 一起使用,所以我知道我应该使用适配器,但我不知道我应该如何将适配器与动态 tcp 客户端一起使用。

我在这里找到了类似的主题和一些回复,但无法达到我的目标或找到了组合解决方案的示例。

Spring integration TCP server push to client

您只需要注册两个流;一个用于输入;一个用于输出 - 问题是关联回复的响应,并将任意消息路由到网关以外的某个地方。

我更新了这个用例的示例 on this branch

您可以看到该分支上最后一次提交的更改;大多数更改是为了模拟您的服务器端。

在客户端,我们简单地注册了两个流并使用@ServiceActivator方法来获取入站消息;您可以通过连接 ID 识别它们来自哪个服务器。