使用 spring-websocket 和 rabbitmq-stomp 时,消息不会发送给所有活跃订阅者
Message is not sent to all active subscribers when using spring-websocket and rabbitmq-stomp
我有一个基于 spring websocket over stomp 的网络应用程序(由 spring boot 1.5.1 提供支持)。我正在使用带有 stomp 插件的 Rabbitmq(3.6.6) 作为全功能代理。
根据 the doc of stomp,目标来自 /topic/ 的消息将传送给所有活跃订阅者。
Topic Destinations
For simple topic destinations which deliver a copy
of each message to all active subscribers, destinations of the form
/topic/ can be used. Topic destinations support all the routing
patterns of AMQP topic exchanges.
Messages sent to a topic destination that has no active subscribers
are simply discarded.
但行为不与我应用中的上述声明一致!
我在两个浏览器中打开了同一个页面。因此有两个客户端连接到 websocket 服务器。他们都订阅了以 /topic/
.
开头的相同目的地
我将消息发送到目的地/topic/<route key>
后,但只有一个客户端会收到消息。两个客户端将轮流接收来自同一目的地的消息。
在我的 spring 服务器端应用程序中,我将消息发送到如下目的地,
@Secured(User.ROLE_USER)
@MessageMapping("/comment/{liveid}")
@SendTo("/topic/comment-{liveid}")
public CommentMessage userComment(@DestinationVariable("liveid") String liveid,
@AuthenticationPrincipal UserDetails activeUser, UserComment userComment) {
logger.debug("Receiving comment message '{}' of live '{}' from user '{}'.",
userComment,liveid, activeUser.getUsername());
final User user = userService.findByUsername(activeUser.getUsername()).get();
return CommentMessage.builder().content(userComment.getContent()).sender(user.getNickname())
.senderAvatar(user.getAvatar()).build();
}
在我的客户端,它订阅了如下持久主题,
$stomp.subscribe('/topic/comment-' + $scope.lives[i].id, function(payload, headers, res) {
// do something
}, {
'durable': true,
'auto-delete': false
});
下面是我的spring应用中websocket的配置,
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractSessionWebSocketMessageBrokerConfigurer<ExpiringSession> {
@Value("${stompBroker.host:localhost}")
String brokerHost;
@Value("${stompBroker.port:61613}")
int brokerPort;
@Value("${stompBroker.login:guest}")
String brokerLogin;
@Value("${stompBroker.passcode:guest}")
String brokerPasscode;
@Value("${stompBroker.vhost:myvhost}")
String brokerVHost;
@Override
protected void configureStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/live/ws").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic/").setRelayHost(brokerHost).setRelayPort(
brokerPort).setSystemLogin(brokerLogin).setSystemPasscode(brokerPasscode).setVirtualHost(brokerVHost);
/**
* Both of two subscribers can receive the message if using simple broker
registry.enableSimpleBroker("/topic/");
*/
registry.setApplicationDestinationPrefixes("/app");
}
@Configuration
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages.simpDestMatchers("/app/*").hasRole("USER");
}
@Override
protected boolean sameOriginDisabled() {
return true;
}
}
}
是不是我的RabbitMQ和Stomp插件配置有问题?它在使用 SimpleMessageBroker
而不是 RabbitMQ 时运行良好。
已通过 rabbimq-users group 中的讨论解决。
我是用持久订阅的,有相同的ID,消费者变成竞争消费者。
在使用持久队列或使用自动删除队列时指定不同的客户端 ID 解决了该问题。
我有一个基于 spring websocket over stomp 的网络应用程序(由 spring boot 1.5.1 提供支持)。我正在使用带有 stomp 插件的 Rabbitmq(3.6.6) 作为全功能代理。
根据 the doc of stomp,目标来自 /topic/ 的消息将传送给所有活跃订阅者。
Topic Destinations
For simple topic destinations which deliver a copy of each message to all active subscribers, destinations of the form /topic/ can be used. Topic destinations support all the routing patterns of AMQP topic exchanges.
Messages sent to a topic destination that has no active subscribers are simply discarded.
但行为不与我应用中的上述声明一致!
我在两个浏览器中打开了同一个页面。因此有两个客户端连接到 websocket 服务器。他们都订阅了以 /topic/
.
我将消息发送到目的地/topic/<route key>
后,但只有一个客户端会收到消息。两个客户端将轮流接收来自同一目的地的消息。
在我的 spring 服务器端应用程序中,我将消息发送到如下目的地,
@Secured(User.ROLE_USER)
@MessageMapping("/comment/{liveid}")
@SendTo("/topic/comment-{liveid}")
public CommentMessage userComment(@DestinationVariable("liveid") String liveid,
@AuthenticationPrincipal UserDetails activeUser, UserComment userComment) {
logger.debug("Receiving comment message '{}' of live '{}' from user '{}'.",
userComment,liveid, activeUser.getUsername());
final User user = userService.findByUsername(activeUser.getUsername()).get();
return CommentMessage.builder().content(userComment.getContent()).sender(user.getNickname())
.senderAvatar(user.getAvatar()).build();
}
在我的客户端,它订阅了如下持久主题,
$stomp.subscribe('/topic/comment-' + $scope.lives[i].id, function(payload, headers, res) {
// do something
}, {
'durable': true,
'auto-delete': false
});
下面是我的spring应用中websocket的配置,
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractSessionWebSocketMessageBrokerConfigurer<ExpiringSession> {
@Value("${stompBroker.host:localhost}")
String brokerHost;
@Value("${stompBroker.port:61613}")
int brokerPort;
@Value("${stompBroker.login:guest}")
String brokerLogin;
@Value("${stompBroker.passcode:guest}")
String brokerPasscode;
@Value("${stompBroker.vhost:myvhost}")
String brokerVHost;
@Override
protected void configureStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/live/ws").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic/").setRelayHost(brokerHost).setRelayPort(
brokerPort).setSystemLogin(brokerLogin).setSystemPasscode(brokerPasscode).setVirtualHost(brokerVHost);
/**
* Both of two subscribers can receive the message if using simple broker
registry.enableSimpleBroker("/topic/");
*/
registry.setApplicationDestinationPrefixes("/app");
}
@Configuration
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages.simpDestMatchers("/app/*").hasRole("USER");
}
@Override
protected boolean sameOriginDisabled() {
return true;
}
}
}
是不是我的RabbitMQ和Stomp插件配置有问题?它在使用 SimpleMessageBroker
而不是 RabbitMQ 时运行良好。
已通过 rabbimq-users group 中的讨论解决。
我是用持久订阅的,有相同的ID,消费者变成竞争消费者。
在使用持久队列或使用自动删除队列时指定不同的客户端 ID 解决了该问题。