Spring 启动 SSL TCPClient ~ StompBrokerRelayMessageHandler ~ ActiveMQ ~ Undertow
Spring Boot SSL TCPClient ~ StompBrokerRelayMessageHandler ~ ActiveMQ ~ Undertow
我正在尝试构建一个基于 Spring Websocket Demo running ActiveMQ as the STOMP message broker with Undertow. The application runs fine on insecure connections. However, I'm having difficulty configuring the STOMP Broker Relay 的 websocket 消息传递应用程序来转发 SSL 连接。
如 Spring WebSocket 文档中所述...
The "STOMP broker relay" in the above configuration is a Spring MessageHandler that handles messages by forwarding them to an external message broker. To do so it establishes TCP connections to the broker, forwards all messages to it, and then forwards all messages received from the broker to clients through their WebSocket sessions. Essentially it acts as a "relay" that forwards messages in both directions.
此外,文档说明了对 reactor-net 的依赖,我有...
Please add a dependency on org.projectreactor:reactor-net for TCP connection management.
问题是我当前的实现没有通过 SSL 初始化 NettyTCPClient,因此 ActiveMQ 连接失败并出现 SSLException。
[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED:
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442]
...
[o.a.a.b.TransportConnection.Transport:245] »
Transport Connection to: tcp://127.0.0.1:17779 failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
...
因此,我尝试研究 Project Reactor Docs 来为连接设置 SSL 选项,但我没有成功。
此时我发现 StompBrokerRelayMessageHandler 在 Reactor2TcpClient 中默认初始化 NettyTCPClient 但是,它没有似乎无法配置。
将不胜感激。
SSCCE
app.props
spring.activemq.in-memory=true
spring.activemq.pooled=false
spring.activemq.broker-url=stomp+ssl://localhost:8442
server.port=8443
server.ssl.enabled=true
server.ssl.protocol=tls
server.ssl.key-alias=undertow
server.ssl.key-store=classpath:undertow.jks
server.ssl.key-store-password=xxx
server.ssl.trust-store=classpath:undertow_certs.jks
server.ssl.trust-store-password=xxx
WebSocketConfig
//...
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class);
private final static String KEYSTORE = "/activemq.jks";
private final static String KEYSTORE_PASS = "xxx";
private final static String KEYSTORE_TYPE = "JKS";
private final static String TRUSTSTORE = "/activemq_certs.jks";
private final static String TRUSTSTORE_PASS = "xxx";
private static String getBindLocation() {
return "stomp+ssl://localhost:8442?transport.needClientAuth=false";
}
@Bean(initMethod = "start", destroyMethod = "stop")
public SslBrokerService activeMQBroker() throws Exception {
final SslBrokerService service = new SslBrokerService();
service.setPersistent(false);
KeyManager[] km = SecurityManager.getKeyManager();
TrustManager[] tm = SecurityManager.getTrustManager();
service.addSslConnector(getBindLocation(), km, tm, null);
final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test");
service.setDestinations(new ActiveMQDestination[]{topic});
return service;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(8442);
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/welcome").withSockJS();
registry.addEndpoint("/test").withSockJS();
}
private static class SecurityManager {
//elided...
}
}
SOLVED Per Rossens Advice. Here's the implementation details for anyone interested.
WebSocketConfig
@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {
...
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
ConfigurationReader reader = new StompClientDispatcherConfigReader();
Environment environment = new Environment(reader).assignErrorJournal();
TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443));
handler.setTcpClient(client);
return handler;
}
}
StompTCPClientSpecFactory
private static class StompTcpClientSpecFactory
implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class);
private final String host;
private final int port;
private final String KEYSTORE = "src/main/resources/tcpclient.jks";
private final String KEYSTORE_PASS = "xxx";
private final String KEYSTORE_TYPE = "JKS";
private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks";
private final String TRUSTSTORE_PASS = "xxx";
private final String TRUSTSTORE_TYPE = "JKS";
private final Environment environment;
private final SecurityManager tcpManager = new SecurityManager
.SSLBuilder(KEYSTORE, KEYSTORE_PASS)
.keyStoreType(KEYSTORE_TYPE)
.trustStore(TRUSTSTORE, TRUSTSTORE_PASS)
.trustStoreType(TRUSTSTORE_TYPE)
.build();
public StompTcpClientSpecFactory(Environment environment, String host, int port) {
this.environment = environment;
this.host = host;
this.port = port;
}
@Override
public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
return tcpClientSpec
.ssl(new SslOptions()
.sslProtocol("TLS")
.keystoreFile(tcpManager.getKeyStore())
.keystorePasswd(tcpManager.getKeyStorePass())
.trustManagers(tcpManager::getTrustManager)
.trustManagerPasswd(tcpManager.getTrustStorePass()))
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.env(this.environment)
.dispatcher(this.environment.getCachedDispatchers("StompClient").get())
.connect(this.host, this.port);
}
}
StompBrokerRelayMessageHandler
有一个 tcpClient 属性 可以设置。但是,我们似乎没有通过 WebSocketMessageBrokerConfigurer
设置公开它。
您可以删除 @EnableWebSocketMessageBroker
并改为扩展 DelegatingWebSocketMessageBrokerConfiguration
。它实际上是相同的,但您现在直接从提供所有 bean 的配置 class 进行扩展。
这允许您覆盖 stompBrokerRelayMessageHandler()
bean 并直接设置它的 TcpClient 属性。只需确保重写方法标有 @Bean
.
我需要使用 Spring Messaging 4.2.5 和 Java 8 来保护 STOMP 代理中继到 RabbitMQ,发现问题的后续代码已经过时。
启动我的应用程序时,我提供信任库环境属性以信任内部自签名证书颁发机构。
java -Djavax.net.ssl.trustStore=/etc/pki/java/server.jks -Djavax.net.ssl.trustStorePassword=xxxxx -jar build/libs/server.war
根据罗森的回答,我更改了
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
到
@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {
然后,在 WebSocketConfig
中,我提供了自己的 AbstractBrokerMessageHandler
bean:
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
AbstractBrokerMessageHandler handler = super.stompBrokerRelayMessageHandler();
if (handler instanceof StompBrokerRelayMessageHandler) {
((StompBrokerRelayMessageHandler) handler).setTcpClient(new Reactor2TcpClient<>(
new StompTcpFactory("127.0.0.1", 61614, true)
));
}
return handler;
}
条件 instanceof 是为了简化 NoOpBrokerMessageHandler
在单元测试中的使用。
最后,下面是上面使用的StompTcpFactory的实现:
public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
private final Environment environment = new Environment(new SynchronousDispatcherConfigReader());
private final String host;
private final int port;
private final boolean ssl;
public StompTcpFactory(String host, int port, boolean ssl) {
this.host = host;
this.port = port;
this.ssl = ssl;
}
@Override
public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
return tcpClientSpec
.env(environment)
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.ssl(ssl ? new SslOptions() : null)
.connect(host, port);
}
private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
@Override
public ReactorConfiguration read() {
return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties());
}
}
}
@amoebob 答案很好,但线程没有正确关闭。每次来自客户端的连接打开时,都会打开一个新线程并且永远不会关闭。我在生产中发现了这个问题,并花了几天时间来解决它。所以我建议您更改 StompTcpFactory 以提高线程重用:
import io.netty.channel.EventLoopGroup;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.stomp.Reactor2StompCodec;
import org.springframework.messaging.simp.stomp.StompDecoder;
import org.springframework.messaging.simp.stomp.StompEncoder;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import reactor.Environment;
import reactor.core.config.ReactorConfiguration;
import reactor.io.net.NetStreams;
import reactor.io.net.Spec;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.netty.NettyClientSocketOptions;
public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
private final Environment environment;
private final EventLoopGroup eventLoopGroup;
private final String host;
private final int port;
private final boolean ssl;
public StompTcpFactory(String host, int port, boolean ssl) {
this.host = host;
this.port = port;
this.ssl = ssl;
this.environment = new Environment(() -> new ReactorConfiguration(emptyList(), "sync", new Properties()));
this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
}
@Override
public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
return tcpClientSpec
.env(environment)
.options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup))
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.ssl(ssl ? new SslOptions() : null)
.connect(host, port);
}
}
对于所有寻找更新解决方案的人,我设法以更简洁的方式解决了问题。只需通过 SSL 创建和使用自己的 TCP 客户端:
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
private final WebsocketProperties properties;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*");
registry.addEndpoint("/ws").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
ReactorNettyTcpClient<byte[]> tcpClient = new ReactorNettyTcpClient<>(configurer -> configurer
.host(properties.getRelayHost())
.port(properties.getRelayPort())
.secure(), new StompReactorNettyCodec());
registry.enableStompBrokerRelay("/queue", "/topic")
.setAutoStartup(true)
.setSystemLogin(properties.getClientLogin())
.setSystemPasscode(properties.getClientPasscode())
.setClientLogin(properties.getClientLogin())
.setClientPasscode(properties.getClientPasscode())
.setTcpClient(tcpClient);
registry.setApplicationDestinationPrefixes("/app");
}
}
在我的例子中(略有不同)我创建了两个 ReactorNettyTcpClient
作为 Beans 的实现,并且根据环境我选择了一个带/不带 SSL 的实现。
依赖关系:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
<relativePath/>
</parent>
.
.
.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-stomp</artifactId>
<version>5.16.2</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.0.8</version>
</dependency>
我希望目前正在尝试解决此问题的任何人发现它有用。
我正在尝试构建一个基于 Spring Websocket Demo running ActiveMQ as the STOMP message broker with Undertow. The application runs fine on insecure connections. However, I'm having difficulty configuring the STOMP Broker Relay 的 websocket 消息传递应用程序来转发 SSL 连接。
如 Spring WebSocket 文档中所述...
The "STOMP broker relay" in the above configuration is a Spring MessageHandler that handles messages by forwarding them to an external message broker. To do so it establishes TCP connections to the broker, forwards all messages to it, and then forwards all messages received from the broker to clients through their WebSocket sessions. Essentially it acts as a "relay" that forwards messages in both directions.
此外,文档说明了对 reactor-net 的依赖,我有...
Please add a dependency on org.projectreactor:reactor-net for TCP connection management.
问题是我当前的实现没有通过 SSL 初始化 NettyTCPClient,因此 ActiveMQ 连接失败并出现 SSLException。
[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED:
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442]
...
[o.a.a.b.TransportConnection.Transport:245] »
Transport Connection to: tcp://127.0.0.1:17779 failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
...
因此,我尝试研究 Project Reactor Docs 来为连接设置 SSL 选项,但我没有成功。
此时我发现 StompBrokerRelayMessageHandler 在 Reactor2TcpClient 中默认初始化 NettyTCPClient 但是,它没有似乎无法配置。
将不胜感激。
SSCCE
app.props
spring.activemq.in-memory=true
spring.activemq.pooled=false
spring.activemq.broker-url=stomp+ssl://localhost:8442
server.port=8443
server.ssl.enabled=true
server.ssl.protocol=tls
server.ssl.key-alias=undertow
server.ssl.key-store=classpath:undertow.jks
server.ssl.key-store-password=xxx
server.ssl.trust-store=classpath:undertow_certs.jks
server.ssl.trust-store-password=xxx
WebSocketConfig
//...
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class);
private final static String KEYSTORE = "/activemq.jks";
private final static String KEYSTORE_PASS = "xxx";
private final static String KEYSTORE_TYPE = "JKS";
private final static String TRUSTSTORE = "/activemq_certs.jks";
private final static String TRUSTSTORE_PASS = "xxx";
private static String getBindLocation() {
return "stomp+ssl://localhost:8442?transport.needClientAuth=false";
}
@Bean(initMethod = "start", destroyMethod = "stop")
public SslBrokerService activeMQBroker() throws Exception {
final SslBrokerService service = new SslBrokerService();
service.setPersistent(false);
KeyManager[] km = SecurityManager.getKeyManager();
TrustManager[] tm = SecurityManager.getTrustManager();
service.addSslConnector(getBindLocation(), km, tm, null);
final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test");
service.setDestinations(new ActiveMQDestination[]{topic});
return service;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(8442);
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/welcome").withSockJS();
registry.addEndpoint("/test").withSockJS();
}
private static class SecurityManager {
//elided...
}
}
SOLVED Per Rossens Advice. Here's the implementation details for anyone interested.
WebSocketConfig
@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {
...
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
ConfigurationReader reader = new StompClientDispatcherConfigReader();
Environment environment = new Environment(reader).assignErrorJournal();
TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443));
handler.setTcpClient(client);
return handler;
}
}
StompTCPClientSpecFactory
private static class StompTcpClientSpecFactory
implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class);
private final String host;
private final int port;
private final String KEYSTORE = "src/main/resources/tcpclient.jks";
private final String KEYSTORE_PASS = "xxx";
private final String KEYSTORE_TYPE = "JKS";
private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks";
private final String TRUSTSTORE_PASS = "xxx";
private final String TRUSTSTORE_TYPE = "JKS";
private final Environment environment;
private final SecurityManager tcpManager = new SecurityManager
.SSLBuilder(KEYSTORE, KEYSTORE_PASS)
.keyStoreType(KEYSTORE_TYPE)
.trustStore(TRUSTSTORE, TRUSTSTORE_PASS)
.trustStoreType(TRUSTSTORE_TYPE)
.build();
public StompTcpClientSpecFactory(Environment environment, String host, int port) {
this.environment = environment;
this.host = host;
this.port = port;
}
@Override
public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
return tcpClientSpec
.ssl(new SslOptions()
.sslProtocol("TLS")
.keystoreFile(tcpManager.getKeyStore())
.keystorePasswd(tcpManager.getKeyStorePass())
.trustManagers(tcpManager::getTrustManager)
.trustManagerPasswd(tcpManager.getTrustStorePass()))
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.env(this.environment)
.dispatcher(this.environment.getCachedDispatchers("StompClient").get())
.connect(this.host, this.port);
}
}
StompBrokerRelayMessageHandler
有一个 tcpClient 属性 可以设置。但是,我们似乎没有通过 WebSocketMessageBrokerConfigurer
设置公开它。
您可以删除 @EnableWebSocketMessageBroker
并改为扩展 DelegatingWebSocketMessageBrokerConfiguration
。它实际上是相同的,但您现在直接从提供所有 bean 的配置 class 进行扩展。
这允许您覆盖 stompBrokerRelayMessageHandler()
bean 并直接设置它的 TcpClient 属性。只需确保重写方法标有 @Bean
.
我需要使用 Spring Messaging 4.2.5 和 Java 8 来保护 STOMP 代理中继到 RabbitMQ,发现问题的后续代码已经过时。
启动我的应用程序时,我提供信任库环境属性以信任内部自签名证书颁发机构。
java -Djavax.net.ssl.trustStore=/etc/pki/java/server.jks -Djavax.net.ssl.trustStorePassword=xxxxx -jar build/libs/server.war
根据罗森的回答,我更改了
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
到
@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {
然后,在 WebSocketConfig
中,我提供了自己的 AbstractBrokerMessageHandler
bean:
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
AbstractBrokerMessageHandler handler = super.stompBrokerRelayMessageHandler();
if (handler instanceof StompBrokerRelayMessageHandler) {
((StompBrokerRelayMessageHandler) handler).setTcpClient(new Reactor2TcpClient<>(
new StompTcpFactory("127.0.0.1", 61614, true)
));
}
return handler;
}
条件 instanceof 是为了简化 NoOpBrokerMessageHandler
在单元测试中的使用。
最后,下面是上面使用的StompTcpFactory的实现:
public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
private final Environment environment = new Environment(new SynchronousDispatcherConfigReader());
private final String host;
private final int port;
private final boolean ssl;
public StompTcpFactory(String host, int port, boolean ssl) {
this.host = host;
this.port = port;
this.ssl = ssl;
}
@Override
public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
return tcpClientSpec
.env(environment)
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.ssl(ssl ? new SslOptions() : null)
.connect(host, port);
}
private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
@Override
public ReactorConfiguration read() {
return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties());
}
}
}
@amoebob 答案很好,但线程没有正确关闭。每次来自客户端的连接打开时,都会打开一个新线程并且永远不会关闭。我在生产中发现了这个问题,并花了几天时间来解决它。所以我建议您更改 StompTcpFactory 以提高线程重用:
import io.netty.channel.EventLoopGroup;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.stomp.Reactor2StompCodec;
import org.springframework.messaging.simp.stomp.StompDecoder;
import org.springframework.messaging.simp.stomp.StompEncoder;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import reactor.Environment;
import reactor.core.config.ReactorConfiguration;
import reactor.io.net.NetStreams;
import reactor.io.net.Spec;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.netty.NettyClientSocketOptions;
public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
private final Environment environment;
private final EventLoopGroup eventLoopGroup;
private final String host;
private final int port;
private final boolean ssl;
public StompTcpFactory(String host, int port, boolean ssl) {
this.host = host;
this.port = port;
this.ssl = ssl;
this.environment = new Environment(() -> new ReactorConfiguration(emptyList(), "sync", new Properties()));
this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
}
@Override
public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
return tcpClientSpec
.env(environment)
.options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup))
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.ssl(ssl ? new SslOptions() : null)
.connect(host, port);
}
}
对于所有寻找更新解决方案的人,我设法以更简洁的方式解决了问题。只需通过 SSL 创建和使用自己的 TCP 客户端:
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
private final WebsocketProperties properties;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*");
registry.addEndpoint("/ws").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
ReactorNettyTcpClient<byte[]> tcpClient = new ReactorNettyTcpClient<>(configurer -> configurer
.host(properties.getRelayHost())
.port(properties.getRelayPort())
.secure(), new StompReactorNettyCodec());
registry.enableStompBrokerRelay("/queue", "/topic")
.setAutoStartup(true)
.setSystemLogin(properties.getClientLogin())
.setSystemPasscode(properties.getClientPasscode())
.setClientLogin(properties.getClientLogin())
.setClientPasscode(properties.getClientPasscode())
.setTcpClient(tcpClient);
registry.setApplicationDestinationPrefixes("/app");
}
}
在我的例子中(略有不同)我创建了两个 ReactorNettyTcpClient
作为 Beans 的实现,并且根据环境我选择了一个带/不带 SSL 的实现。
依赖关系:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
<relativePath/>
</parent>
.
.
.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-stomp</artifactId>
<version>5.16.2</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.0.8</version>
</dependency>
我希望目前正在尝试解决此问题的任何人发现它有用。