Spring 引导 - 如何连接到具有故障转移功能的外部 ActiveMQ master/slave 集群 URL

Spring boot - how to connect to external ActiveMQ master/slave cluster with failover URL

我们在不同的虚拟机上有 2 个 ActiveMQ 节点(例如主机:amq1、amq2)。它们链接为 master/slave 簇。

我们想使用故障转移协议连接到这个集群。如何才能做到这一点? Spring 启动配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {

  @Override
  public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic")
        .setRelayHost(activeMQProperties.getRelayHost())
        .setRelayPort(activeMQProperties.getRelayPort());
  }
}

不幸的是,在这里我们只能设置一个主机和一个端口。我们怎么能这样设置:

failover:(stomp://amq1:61613,stomp://amq2:61613)

更新:目前Spring使用Boot 2.3.5

在应用程序属性中:

spring.activemq.broker-url=failover://tcp://your_host_IP:61616

我不知道 Stomp 协议的最新功能,但通常在 ActiveMQ 中,我们使用前缀 tcp://(或 ssl:// 用于 SSL 安全传输)定义 openwire 协议。对于 Stomp,他们使用 stomp:// 前缀,这里是可以在 ActiveMQ 服务器端配置的示例:

    <transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
        <transportConnector name="stomp" uri="stomp://localhost:61613"/>
        <transportConnector name="websocket" uri="ws://0.0.0.0:61614"/>
    </transportConnectors>

然后假设配置类似于两个 ActiveMQ 节点,如果您需要通过 openwire 参考使用故障转移协议,您将在您中使用 spring-boot 配置(yml 格式):

spring:
  activemq:
    broker-url=failover:(tcp://amq1:61616,tcp://amq2:61616)

但如果与 stomp 一起使用,它将是:

spring:
  activemq:
    broker-url=failover:(stomp://amq1:61613,stomp://amq2:61613)

对于 websocket 踩踏:

spring:
  activemq:
    broker-url=failover:(ws://amq1:61614,ws://amq2:61614)

备注:

  • 端口更改以匹配 ActiveMQ 上的配置。
  • 看来您并不真的需要 @Configuation bean:使用上述属性足以配置具有故障转移功能的 ActiveMQ。
  • 由于您的代码中提到了 websocket,我还提到了您的 AMQ 可能使用的三种可能性。

但是如果您 post 您的 ActiveMQ 传输部分会更好,这样会更好。如果您无权访问它,我建议您联系您的 ActiveMQ 管理员,询问正确的 url 以及访问 ActiveMQ 的方式是什么(还有其他可以启用的协议,如 MQTT 和AMPQ 提一下用的最多的)。

我已经尝试了您提到的连接字符串中的“故障转移”选项,但它没有用,并且发现一些线程甚至不支持 stomp。

所以最终的解决方案看起来像是一个自己的实现:两个具有主从配置的ActiveMQ服务器。

Spring 配置(重要部分):

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
    private final Logger LOGGER = LoggerFactory.getLogger(WebsocketConfiguration.class);
    
    private final ActiveMQProperties activeMQProperties;
    
    // used by own round-robin implementation to connect to the current master ActiveMQ
    private int index = 0;
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic")
            .setSystemHeartbeatReceiveInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
            .setSystemHeartbeatSendInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
            .setTcpClient(createTcpClient());
    }
    
    private ReactorNettyTcpClient<byte[]> createTcpClient() {
        return new ReactorNettyTcpClient<>(
            client -> client.remoteAddress(socketAddressSupplier()),
            new StompReactorNettyCodec());
    }

    private Supplier<? extends SocketAddress> socketAddressSupplier() {

        Supplier<? extends SocketAddress> socketAddressSupplier = () -> {
            index++;
            if (index >= activeMQProperties.getActiveMQServerList().size()) {
                index = 0;
            }
            return new InetSocketAddress(activeMQProperties.getActiveMQServerList().get(index).getRelayHost(),
                activeMQProperties.getActiveMQServerList().get(index).getRelayPort());
        };

        return socketAddressSupplier;
    }
}

ActiveMQProperties:

activemq:                              
    activeMQServerList:
      -
        relayHost: host1
        relayPort: 61613
      -
        relayHost: host2
        relayPort: 61613

诀窍在于供应商。当主 ActiveMQ 出现故障时,供应商将 return 列表中的下一个配置服务器并重新连接到该服务器。

它工作正常。

setTcpClient 添加到您的 configureMessageBroker 方法中以获得 round-robin 实现,如下所示:索引将在每次当前主机不可用时在主机之间切换(setTcpClient 将次发生这种情况)

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private final CompositeData compositeData;
    private final int stompPort;
    private final AtomicInteger currentServerIndex = new AtomicInteger(-1);

    public WebSocketConfig(@Value("${jms.broker.url}") String brokerUrl,
                           @Value("${jms.stomp.port}") int stompPort) {
        this.compositeData = parseBrokerUri(brokerUrl);
        this.stompPort = stompPort;
    }


    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket")
                .setAllowedOrigins("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic")
                .setRelayHost(activeMQProperties.getRelayHost())
                .setRelayPort(activeMQProperties.getRelayPort())
                .setTcpClient(new ReactorNettyTcpClient<>(
                        builder -> {
                            final TcpClient tcpClient = builder.remoteAddress(() -> new InetSocketAddress(getBrokerHost(), stompPort));
                            return isSecure() ? tcpClient.secure() : tcpClient.noSSL();
                        }, new StompReactorNettyCodec())
                );
    }

    private boolean isSecure() {
        return getBrokerUri().getScheme().equals("ssl");
    }

    private String getBrokerHost() {
        return getBrokerUri().getHost();
    }

    private URI getBrokerUri() {
        currentServerIndex.set((currentServerIndex.incrementAndGet()) % compositeData.getComponents().length);
        return compositeData.getComponents()[currentServerIndex.get()];
    }

    private CompositeData parseBrokerUri(String brokerUri) {
        try {
            return URISupport.parseComposite(new URI(brokerUri));
        } catch (URISyntaxException e) {
            throw new RuntimeException("Error parsing broker uri", e);
        }
    }
}

这里导入只是为了完整性:

import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import reactor.netty.tcp.TcpClient;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;