Spring 引导 - 如何连接到具有故障转移功能的外部 ActiveMQ master/slave 集群 URL
Spring boot - how to connect to external ActiveMQ master/slave cluster with failover URL
我们在不同的虚拟机上有 2 个 ActiveMQ 节点(例如主机:amq1、amq2)。它们链接为 master/slave 簇。
我们想使用故障转移协议连接到这个集群。如何才能做到这一点?
Spring 启动配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setRelayHost(activeMQProperties.getRelayHost())
.setRelayPort(activeMQProperties.getRelayPort());
}
}
不幸的是,在这里我们只能设置一个主机和一个端口。我们怎么能这样设置:
failover:(stomp://amq1:61613,stomp://amq2:61613)
更新:目前Spring使用Boot 2.3.5
在应用程序属性中:
spring.activemq.broker-url=failover://tcp://your_host_IP:61616
我不知道 Stomp 协议的最新功能,但通常在 ActiveMQ 中,我们使用前缀 tcp://
(或 ssl://
用于 SSL 安全传输)定义 openwire 协议。对于 Stomp,他们使用 stomp://
前缀,这里是可以在 ActiveMQ 服务器端配置的示例:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="websocket" uri="ws://0.0.0.0:61614"/>
</transportConnectors>
然后假设配置类似于两个 ActiveMQ 节点,如果您需要通过 openwire
参考使用故障转移协议,您将在您中使用 spring-boot 配置(yml 格式):
spring:
activemq:
broker-url=failover:(tcp://amq1:61616,tcp://amq2:61616)
但如果与 stomp 一起使用,它将是:
spring:
activemq:
broker-url=failover:(stomp://amq1:61613,stomp://amq2:61613)
对于 websocket 踩踏:
spring:
activemq:
broker-url=failover:(ws://amq1:61614,ws://amq2:61614)
备注:
- 端口更改以匹配 ActiveMQ 上的配置。
- 看来您并不真的需要
@Configuation
bean:使用上述属性足以配置具有故障转移功能的 ActiveMQ。
- 由于您的代码中提到了 websocket,我还提到了您的 AMQ 可能使用的三种可能性。
但是如果您 post 您的 ActiveMQ 传输部分会更好,这样会更好。如果您无权访问它,我建议您联系您的 ActiveMQ 管理员,询问正确的 url 以及访问 ActiveMQ 的方式是什么(还有其他可以启用的协议,如 MQTT 和AMPQ 提一下用的最多的)。
我已经尝试了您提到的连接字符串中的“故障转移”选项,但它没有用,并且发现一些线程甚至不支持 stomp。
所以最终的解决方案看起来像是一个自己的实现:两个具有主从配置的ActiveMQ服务器。
Spring 配置(重要部分):
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
private final Logger LOGGER = LoggerFactory.getLogger(WebsocketConfiguration.class);
private final ActiveMQProperties activeMQProperties;
// used by own round-robin implementation to connect to the current master ActiveMQ
private int index = 0;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setSystemHeartbeatReceiveInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
.setSystemHeartbeatSendInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
.setTcpClient(createTcpClient());
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
client -> client.remoteAddress(socketAddressSupplier()),
new StompReactorNettyCodec());
}
private Supplier<? extends SocketAddress> socketAddressSupplier() {
Supplier<? extends SocketAddress> socketAddressSupplier = () -> {
index++;
if (index >= activeMQProperties.getActiveMQServerList().size()) {
index = 0;
}
return new InetSocketAddress(activeMQProperties.getActiveMQServerList().get(index).getRelayHost(),
activeMQProperties.getActiveMQServerList().get(index).getRelayPort());
};
return socketAddressSupplier;
}
}
ActiveMQProperties:
activemq:
activeMQServerList:
-
relayHost: host1
relayPort: 61613
-
relayHost: host2
relayPort: 61613
诀窍在于供应商。当主 ActiveMQ 出现故障时,供应商将 return 列表中的下一个配置服务器并重新连接到该服务器。
它工作正常。
将 setTcpClient
添加到您的 configureMessageBroker
方法中以获得 round-robin
实现,如下所示:索引将在每次当前主机不可用时在主机之间切换(setTcpClient 将每次发生这种情况)
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final CompositeData compositeData;
private final int stompPort;
private final AtomicInteger currentServerIndex = new AtomicInteger(-1);
public WebSocketConfig(@Value("${jms.broker.url}") String brokerUrl,
@Value("${jms.stomp.port}") int stompPort) {
this.compositeData = parseBrokerUri(brokerUrl);
this.stompPort = stompPort;
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setRelayHost(activeMQProperties.getRelayHost())
.setRelayPort(activeMQProperties.getRelayPort())
.setTcpClient(new ReactorNettyTcpClient<>(
builder -> {
final TcpClient tcpClient = builder.remoteAddress(() -> new InetSocketAddress(getBrokerHost(), stompPort));
return isSecure() ? tcpClient.secure() : tcpClient.noSSL();
}, new StompReactorNettyCodec())
);
}
private boolean isSecure() {
return getBrokerUri().getScheme().equals("ssl");
}
private String getBrokerHost() {
return getBrokerUri().getHost();
}
private URI getBrokerUri() {
currentServerIndex.set((currentServerIndex.incrementAndGet()) % compositeData.getComponents().length);
return compositeData.getComponents()[currentServerIndex.get()];
}
private CompositeData parseBrokerUri(String brokerUri) {
try {
return URISupport.parseComposite(new URI(brokerUri));
} catch (URISyntaxException e) {
throw new RuntimeException("Error parsing broker uri", e);
}
}
}
这里导入只是为了完整性:
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import reactor.netty.tcp.TcpClient;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
我们在不同的虚拟机上有 2 个 ActiveMQ 节点(例如主机:amq1、amq2)。它们链接为 master/slave 簇。
我们想使用故障转移协议连接到这个集群。如何才能做到这一点? Spring 启动配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setRelayHost(activeMQProperties.getRelayHost())
.setRelayPort(activeMQProperties.getRelayPort());
}
}
不幸的是,在这里我们只能设置一个主机和一个端口。我们怎么能这样设置:
failover:(stomp://amq1:61613,stomp://amq2:61613)
更新:目前Spring使用Boot 2.3.5
在应用程序属性中:
spring.activemq.broker-url=failover://tcp://your_host_IP:61616
我不知道 Stomp 协议的最新功能,但通常在 ActiveMQ 中,我们使用前缀 tcp://
(或 ssl://
用于 SSL 安全传输)定义 openwire 协议。对于 Stomp,他们使用 stomp://
前缀,这里是可以在 ActiveMQ 服务器端配置的示例:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="websocket" uri="ws://0.0.0.0:61614"/>
</transportConnectors>
然后假设配置类似于两个 ActiveMQ 节点,如果您需要通过 openwire
参考使用故障转移协议,您将在您中使用 spring-boot 配置(yml 格式):
spring:
activemq:
broker-url=failover:(tcp://amq1:61616,tcp://amq2:61616)
但如果与 stomp 一起使用,它将是:
spring:
activemq:
broker-url=failover:(stomp://amq1:61613,stomp://amq2:61613)
对于 websocket 踩踏:
spring:
activemq:
broker-url=failover:(ws://amq1:61614,ws://amq2:61614)
备注:
- 端口更改以匹配 ActiveMQ 上的配置。
- 看来您并不真的需要
@Configuation
bean:使用上述属性足以配置具有故障转移功能的 ActiveMQ。 - 由于您的代码中提到了 websocket,我还提到了您的 AMQ 可能使用的三种可能性。
但是如果您 post 您的 ActiveMQ 传输部分会更好,这样会更好。如果您无权访问它,我建议您联系您的 ActiveMQ 管理员,询问正确的 url 以及访问 ActiveMQ 的方式是什么(还有其他可以启用的协议,如 MQTT 和AMPQ 提一下用的最多的)。
我已经尝试了您提到的连接字符串中的“故障转移”选项,但它没有用,并且发现一些线程甚至不支持 stomp。
所以最终的解决方案看起来像是一个自己的实现:两个具有主从配置的ActiveMQ服务器。
Spring 配置(重要部分):
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
private final Logger LOGGER = LoggerFactory.getLogger(WebsocketConfiguration.class);
private final ActiveMQProperties activeMQProperties;
// used by own round-robin implementation to connect to the current master ActiveMQ
private int index = 0;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setSystemHeartbeatReceiveInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
.setSystemHeartbeatSendInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
.setTcpClient(createTcpClient());
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
client -> client.remoteAddress(socketAddressSupplier()),
new StompReactorNettyCodec());
}
private Supplier<? extends SocketAddress> socketAddressSupplier() {
Supplier<? extends SocketAddress> socketAddressSupplier = () -> {
index++;
if (index >= activeMQProperties.getActiveMQServerList().size()) {
index = 0;
}
return new InetSocketAddress(activeMQProperties.getActiveMQServerList().get(index).getRelayHost(),
activeMQProperties.getActiveMQServerList().get(index).getRelayPort());
};
return socketAddressSupplier;
}
}
ActiveMQProperties:
activemq:
activeMQServerList:
-
relayHost: host1
relayPort: 61613
-
relayHost: host2
relayPort: 61613
诀窍在于供应商。当主 ActiveMQ 出现故障时,供应商将 return 列表中的下一个配置服务器并重新连接到该服务器。
它工作正常。
将 setTcpClient
添加到您的 configureMessageBroker
方法中以获得 round-robin
实现,如下所示:索引将在每次当前主机不可用时在主机之间切换(setTcpClient 将每次发生这种情况)
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final CompositeData compositeData;
private final int stompPort;
private final AtomicInteger currentServerIndex = new AtomicInteger(-1);
public WebSocketConfig(@Value("${jms.broker.url}") String brokerUrl,
@Value("${jms.stomp.port}") int stompPort) {
this.compositeData = parseBrokerUri(brokerUrl);
this.stompPort = stompPort;
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setRelayHost(activeMQProperties.getRelayHost())
.setRelayPort(activeMQProperties.getRelayPort())
.setTcpClient(new ReactorNettyTcpClient<>(
builder -> {
final TcpClient tcpClient = builder.remoteAddress(() -> new InetSocketAddress(getBrokerHost(), stompPort));
return isSecure() ? tcpClient.secure() : tcpClient.noSSL();
}, new StompReactorNettyCodec())
);
}
private boolean isSecure() {
return getBrokerUri().getScheme().equals("ssl");
}
private String getBrokerHost() {
return getBrokerUri().getHost();
}
private URI getBrokerUri() {
currentServerIndex.set((currentServerIndex.incrementAndGet()) % compositeData.getComponents().length);
return compositeData.getComponents()[currentServerIndex.get()];
}
private CompositeData parseBrokerUri(String brokerUri) {
try {
return URISupport.parseComposite(new URI(brokerUri));
} catch (URISyntaxException e) {
throw new RuntimeException("Error parsing broker uri", e);
}
}
}
这里导入只是为了完整性:
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import reactor.netty.tcp.TcpClient;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;