Spring 集成 TcpOutboundGateway、ServiceActivator、消息通道和错误 MessageDispatchingException:Dispatcher 没有订阅者

Spring Integration TcpOutboundGateway, ServiceActivator, Message Channel and Error MessageDispatchingException: Dispatcher has no subscribers

正如@Gary Russell 在此 中提到的,我扩展了 TcpOutboundGateway 以在没有任何请求的情况下接收来自 TCP 服务器的消息。

这里是我的自定义 TcpOutboundGateway,如果消息负载包含“freqID”,那么它将消息发送到 MessageChannel

    public class ExtendedTcpOutboundGateway extends TcpOutboundGateway {

    private final DirectChannel unsolicitedMessageChannel;


    public ExtendedTcpOutboundGateway(DirectChannel unsolicitedMessageChannel) {
        this.unsolicitedMessageChannel = unsolicitedMessageChannel;
    }

    @Override
    public boolean onMessage(Message<?> message) {
        if (isUnsolicitedMessage((Message<byte[]>) message)) {
            this.messagingTemplate.send(this.unsolicitedMessageChannel, message);
            return false;
        } else {
            return super.onMessage(message);
        }
    }

    private boolean isUnsolicitedMessage(Message<byte[]> message) {
        byte[] payloadByte = message.getPayload();

        String payloadString = new String(payloadByte);

        System.out.println(payloadString);

        return payloadString.contains("freqID");
    }
}

下面是动态 tcp 路由流代码,如您所见,我添加了参数名称“unsolicitedMessageChannelName”,用于创建带有 Id 的直接通道,然后将该 DirectChannel 赋给 ExtendedTcpOutboundGateway 的构造函数处理来自 tcp 服务器发送的数据,没有任何请求

private MessageChannel createNewSubflow(Message<?> message) {
    String host = (String) message.getHeaders().get("host");
    Integer port = (Integer) message.getHeaders().get("port");
    String unsolicitedMessageChannelName= (String) message.getHeaders().get("unsolicitedMessageChannelName");

    Assert.state(host != null && port != null, "host and/or port header missing");
    String hostPort = host + port;

    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
    cf.setLeaveOpen(true);

    ByteArrayCrLfSerializer byteArrayCrLfSerializer = new ByteArrayCrLfSerializer();
    byteArrayCrLfSerializer.setMaxMessageSize(1048576);

    cf.setSerializer(byteArrayCrLfSerializer);
    cf.setDeserializer(byteArrayCrLfSerializer);

    DirectChannel directChannel = MessageChannels.direct(unsolicitedMessageChannelName).get();

    ExtendedTcpOutboundGateway tcpOutboundGateway = new ExtendedTcpOutboundGateway(directChannel);
    tcpOutboundGateway.setConnectionFactory(cf);

    IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);

    IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
        this.flowContext.registration(flow)
            .addBean(cf)
            .id(hostPort + ".flow")
            .register();
    MessageChannel inputChannel = flowRegistration.getInputChannel();

    this.subFlows.put(hostPort, inputChannel);
    return inputChannel;
}

这是ServiceActivator和tcp客户端的代码;

@Service
public class PeriodicalData implements PeriodicalDataService {
    public void setPeriodicalDataOrder(some parameters) {


        String unsolicitedMessageChannelName="unsolicitedMessageChannelName_Test";

        byte[] result = tcpClientGateway.send(data, ip, port ,unsolicitedMessageChannelName);
        String response = new String(result);
        System.out.println("Here response for request data : " + response +" received");

    }

    @ServiceActivator(inputChannel = "unsolicitedMessageChannelName_Test")
    public void handle(String in) {
        System.out.println("Here unsolicitedMessageChannel data : " + in+" received");
    }
}

当我使用这样的组合时,我正在接受一个例外,比如

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unsolicitedMessageChannelName_Test'.

我不明白为什么我要接受这个例外,因为从这个 link.

可以正常使用不相同但相似的用法

我想我对 ServiceActivator 的设计或使用方式有问题,我应该怎么做才能清除异常?您有什么建议吗?

您在 createNewSubflow() 中确实喜欢这样:

DirectChannel directChannel = MessageChannels.direct(unsolicitedMessageChannelName).get();

并且您没有在应用程序上下文中将其注册为 bean。因此,此对象与提到的 @ServiceActivator 无关,并且它在运行时确实没有任何订阅者。您甚至不需要创建该对象。你需要的是采取现有的 如果您真的想将消息传递到那个 @ServiceActivator.

,则从应用程序上下文为该通道创建 bean

考虑使用 createNewSubflow()BeanFactory 注入到您的组件中以调用其 getBean(unsolicitedMessageChannelName, DirectChannel.class) 以使用适当的服务激活器订阅者访问真实的 bean。