Spring 集成 TcpOutboundGateway、ServiceActivator、消息通道和错误 MessageDispatchingException:Dispatcher 没有订阅者
Spring Integration TcpOutboundGateway, ServiceActivator, Message Channel and Error MessageDispatchingException: Dispatcher has no subscribers
正如@Gary Russell 在此 中提到的,我扩展了 TcpOutboundGateway 以在没有任何请求的情况下接收来自 TCP 服务器的消息。
这里是我的自定义 TcpOutboundGateway,如果消息负载包含“freqID”,那么它将消息发送到 MessageChannel
public class ExtendedTcpOutboundGateway extends TcpOutboundGateway {
private final DirectChannel unsolicitedMessageChannel;
public ExtendedTcpOutboundGateway(DirectChannel unsolicitedMessageChannel) {
this.unsolicitedMessageChannel = unsolicitedMessageChannel;
}
@Override
public boolean onMessage(Message<?> message) {
if (isUnsolicitedMessage((Message<byte[]>) message)) {
this.messagingTemplate.send(this.unsolicitedMessageChannel, message);
return false;
} else {
return super.onMessage(message);
}
}
private boolean isUnsolicitedMessage(Message<byte[]> message) {
byte[] payloadByte = message.getPayload();
String payloadString = new String(payloadByte);
System.out.println(payloadString);
return payloadString.contains("freqID");
}
}
下面是动态 tcp 路由流代码,如您所见,我添加了参数名称“unsolicitedMessageChannelName”,用于创建带有 Id 的直接通道,然后将该 DirectChannel 赋给 ExtendedTcpOutboundGateway 的构造函数处理来自 tcp 服务器发送的数据,没有任何请求
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
String unsolicitedMessageChannelName= (String) message.getHeaders().get("unsolicitedMessageChannelName");
Assert.state(host != null && port != null, "host and/or port header missing");
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
cf.setLeaveOpen(true);
ByteArrayCrLfSerializer byteArrayCrLfSerializer = new ByteArrayCrLfSerializer();
byteArrayCrLfSerializer.setMaxMessageSize(1048576);
cf.setSerializer(byteArrayCrLfSerializer);
cf.setDeserializer(byteArrayCrLfSerializer);
DirectChannel directChannel = MessageChannels.direct(unsolicitedMessageChannelName).get();
ExtendedTcpOutboundGateway tcpOutboundGateway = new ExtendedTcpOutboundGateway(directChannel);
tcpOutboundGateway.setConnectionFactory(cf);
IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
这是ServiceActivator和tcp客户端的代码;
@Service
public class PeriodicalData implements PeriodicalDataService {
public void setPeriodicalDataOrder(some parameters) {
String unsolicitedMessageChannelName="unsolicitedMessageChannelName_Test";
byte[] result = tcpClientGateway.send(data, ip, port ,unsolicitedMessageChannelName);
String response = new String(result);
System.out.println("Here response for request data : " + response +" received");
}
@ServiceActivator(inputChannel = "unsolicitedMessageChannelName_Test")
public void handle(String in) {
System.out.println("Here unsolicitedMessageChannel data : " + in+" received");
}
}
当我使用这样的组合时,我正在接受一个例外,比如
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unsolicitedMessageChannelName_Test'.
我不明白为什么我要接受这个例外,因为从这个 link.
可以正常使用不相同但相似的用法
我想我对 ServiceActivator 的设计或使用方式有问题,我应该怎么做才能清除异常?您有什么建议吗?
您在 createNewSubflow()
中确实喜欢这样:
DirectChannel directChannel = MessageChannels.direct(unsolicitedMessageChannelName).get();
并且您没有在应用程序上下文中将其注册为 bean。因此,此对象与提到的 @ServiceActivator
无关,并且它在运行时确实没有任何订阅者。您甚至不需要创建该对象。你需要的是采取现有的
如果您真的想将消息传递到那个 @ServiceActivator
.
,则从应用程序上下文为该通道创建 bean
考虑使用 createNewSubflow()
将 BeanFactory
注入到您的组件中以调用其 getBean(unsolicitedMessageChannelName, DirectChannel.class)
以使用适当的服务激活器订阅者访问真实的 bean。
正如@Gary Russell 在此
这里是我的自定义 TcpOutboundGateway,如果消息负载包含“freqID”,那么它将消息发送到 MessageChannel
public class ExtendedTcpOutboundGateway extends TcpOutboundGateway {
private final DirectChannel unsolicitedMessageChannel;
public ExtendedTcpOutboundGateway(DirectChannel unsolicitedMessageChannel) {
this.unsolicitedMessageChannel = unsolicitedMessageChannel;
}
@Override
public boolean onMessage(Message<?> message) {
if (isUnsolicitedMessage((Message<byte[]>) message)) {
this.messagingTemplate.send(this.unsolicitedMessageChannel, message);
return false;
} else {
return super.onMessage(message);
}
}
private boolean isUnsolicitedMessage(Message<byte[]> message) {
byte[] payloadByte = message.getPayload();
String payloadString = new String(payloadByte);
System.out.println(payloadString);
return payloadString.contains("freqID");
}
}
下面是动态 tcp 路由流代码,如您所见,我添加了参数名称“unsolicitedMessageChannelName”,用于创建带有 Id 的直接通道,然后将该 DirectChannel 赋给 ExtendedTcpOutboundGateway 的构造函数处理来自 tcp 服务器发送的数据,没有任何请求
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
String unsolicitedMessageChannelName= (String) message.getHeaders().get("unsolicitedMessageChannelName");
Assert.state(host != null && port != null, "host and/or port header missing");
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
cf.setLeaveOpen(true);
ByteArrayCrLfSerializer byteArrayCrLfSerializer = new ByteArrayCrLfSerializer();
byteArrayCrLfSerializer.setMaxMessageSize(1048576);
cf.setSerializer(byteArrayCrLfSerializer);
cf.setDeserializer(byteArrayCrLfSerializer);
DirectChannel directChannel = MessageChannels.direct(unsolicitedMessageChannelName).get();
ExtendedTcpOutboundGateway tcpOutboundGateway = new ExtendedTcpOutboundGateway(directChannel);
tcpOutboundGateway.setConnectionFactory(cf);
IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
这是ServiceActivator和tcp客户端的代码;
@Service
public class PeriodicalData implements PeriodicalDataService {
public void setPeriodicalDataOrder(some parameters) {
String unsolicitedMessageChannelName="unsolicitedMessageChannelName_Test";
byte[] result = tcpClientGateway.send(data, ip, port ,unsolicitedMessageChannelName);
String response = new String(result);
System.out.println("Here response for request data : " + response +" received");
}
@ServiceActivator(inputChannel = "unsolicitedMessageChannelName_Test")
public void handle(String in) {
System.out.println("Here unsolicitedMessageChannel data : " + in+" received");
}
}
当我使用这样的组合时,我正在接受一个例外,比如
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unsolicitedMessageChannelName_Test'.
我不明白为什么我要接受这个例外,因为从这个 link.
可以正常使用不相同但相似的用法我想我对 ServiceActivator 的设计或使用方式有问题,我应该怎么做才能清除异常?您有什么建议吗?
您在 createNewSubflow()
中确实喜欢这样:
DirectChannel directChannel = MessageChannels.direct(unsolicitedMessageChannelName).get();
并且您没有在应用程序上下文中将其注册为 bean。因此,此对象与提到的 @ServiceActivator
无关,并且它在运行时确实没有任何订阅者。您甚至不需要创建该对象。你需要的是采取现有的
如果您真的想将消息传递到那个 @ServiceActivator
.
考虑使用 createNewSubflow()
将 BeanFactory
注入到您的组件中以调用其 getBean(unsolicitedMessageChannelName, DirectChannel.class)
以使用适当的服务激活器订阅者访问真实的 bean。