ActiveMQ Spring Stomp:我如何更改现有代码以创建持久订阅
ActiveMQ Spring Stomp: how can i change my existing code to create persistent subscription
我在我的项目中创建了一个正在运行的通知系统。我的实际代码是:
我的客户(javascript):
let connectWebSocket = () => {
socket = new SockJS(context.backend + '/myWebSocketEndPoint');
stompClient = Stomp.over(socket);
stompClient.connect({},function (frame) {
stompClient.subscribe('/topic/notification', function(response){
alert(response);
});
});
}
connectWebSocket();
服务器(Java 和 Spring)
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/myWebSocketEndPoint")
.setAllowedOrigins("*")
.withSockJS();
}
}
这是有效的。现在我也想在用户离线时向他们发送通知:当他们登录时,我会(自动)向他们发送通知。我必须用 activeMQ 来做这件事。我看过一些例子,但不太理解它们。有人可以告诉我如何准确编辑我的代码并实现持久订阅吗?非常感谢
编辑:我更新了我的客户端代码:
let connectWebSocket = () => {
let clientId =user.profile.id;
socket = new SockJS(context.backend + '/myWebSocketEndPoint');
stompClient = Stomp.over(socket);
stompClient.connect({"client-id": clientId},{},function (frame) {
stompClient.subscribe('/topic/notification', function(response){
alert(response);
},{"activemq.subscriptionName": clientId});
});
}
但是当用户离线时,如果通知到达,当他 returns 在线时,通知不会发送给他..我想我必须改变我的服务器端
POM.xml
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.2</version>
</dependency>
编辑 2::
有了 pom.xml 中的正确依赖项,我现在有一个错误。我有这样的配置:
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic/");
}
但是当我 运行 我的代码时,我看到了这个错误:
2017/01/24 17:17:15.751 ERROR [org.springframework.boot.SpringApplication:839] Application startup failed
org.springframework.context.ApplicationContextException: Failed to start bean 'stompBrokerRelayMessageHandler'; nested exception is java.lang.NoClassDefFoundError: reactor/io/codec/Codec
EDIT3: 这是我向客户发送通知的方式:
@Component
public class MenuItemNotificationSender {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
public MenuItemNotificationSender(SimpMessagingTemplate messagingTemplate){
this.messagingTemplate = messagingTemplate;
}
public void sendNotification(MenuItemDto menuItem) {
messagingTemplate.convertAndSend("/topic/notification", menuItem);
}
}
如果您使用默认的 AMQ 配置,这是持久订阅者的默认行为,消息将被持久化,
如果您想在用户离线时也向他们发送通知,您需要使用持久订阅。
编辑
Persistent Messaging in STOMP STOMP messages are non-persistent by
default. To use persistent messaging add the following STOMP header to
all SEND requests: persistent:true. This default is the opposite of
that for JMS messages.
要持久化已发送的消息,在 js 客户端上,您需要将 header 添加到此方法中:
stompClient.send(destination, {"persistent":"true" }, body);
像这样更新您的 MenuItemNotificationSender :
public void sendNotification(MenuItemDto menuItem) {
Map<String, Object> headers = new HashMap<>();
headers.put("JMSDeliveryMode", 2);
headers.put("persistent", "true");
messagingTemplate.convertAndSend("/topic/notification", menuItem, headers);
}
看看
http://activemq.apache.org/how-do-i-make-messages-durable.html
http://activemq.apache.org/how-do-durable-queues-and-topics-work.html
使用 stomp 进行持久订阅:
stompClient.connect( {"client-id": "my-client-id" },, function ( frame ) {
console.log( 'Connected: ' + frame );
stompClient.subscribe( topic, function ( message ) {
.....
.....
}, {"activemq.subscriptionName": "my-client-id"});
}, function(frame) {
console.log("Web socket disconnected");
});
更新
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{
@Bean(initMethod = "start", destroyMethod = "stop")
public BrokerService broker() throws Exception {
final BrokerService broker = new BrokerService();
//broker.addConnector("tcp://localhost:61616");
broker.addConnector("stomp://localhost:61613");
broker.addConnector("vm://localhost");
PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
File dir = new File(System.getProperty("user.home") + File.separator + "kaha");
if (!dir.exists()) {
dir.mkdirs();
}
persistenceAdapter.setDirectory(dir);
broker.setPersistenceAdapter(persistenceAdapter);
broker.setPersistent(true);
return broker;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// if AMQ is running in local not needed to set relayHost & relayPort
config.enableStompBrokerRelay("/topic/")
.setRelayHost(relayHost)
.setRelayPort(relayPort)
// user pwd if needed
//.setSystemLogin(activeMqLogin)
//.setSystemPasscode(activeMqPassword)
;
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/myWebSocketEndPoint")
.setAllowedOrigins("*")
.withSockJS();
}
}
使用parentpom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.3.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-net</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-stomp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
我在我的项目中创建了一个正在运行的通知系统。我的实际代码是:
我的客户(javascript):
let connectWebSocket = () => {
socket = new SockJS(context.backend + '/myWebSocketEndPoint');
stompClient = Stomp.over(socket);
stompClient.connect({},function (frame) {
stompClient.subscribe('/topic/notification', function(response){
alert(response);
});
});
}
connectWebSocket();
服务器(Java 和 Spring)
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/myWebSocketEndPoint")
.setAllowedOrigins("*")
.withSockJS();
}
}
这是有效的。现在我也想在用户离线时向他们发送通知:当他们登录时,我会(自动)向他们发送通知。我必须用 activeMQ 来做这件事。我看过一些例子,但不太理解它们。有人可以告诉我如何准确编辑我的代码并实现持久订阅吗?非常感谢
编辑:我更新了我的客户端代码:
let connectWebSocket = () => {
let clientId =user.profile.id;
socket = new SockJS(context.backend + '/myWebSocketEndPoint');
stompClient = Stomp.over(socket);
stompClient.connect({"client-id": clientId},{},function (frame) {
stompClient.subscribe('/topic/notification', function(response){
alert(response);
},{"activemq.subscriptionName": clientId});
});
}
但是当用户离线时,如果通知到达,当他 returns 在线时,通知不会发送给他..我想我必须改变我的服务器端
POM.xml
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.2</version>
</dependency>
编辑 2:: 有了 pom.xml 中的正确依赖项,我现在有一个错误。我有这样的配置:
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic/");
}
但是当我 运行 我的代码时,我看到了这个错误:
2017/01/24 17:17:15.751 ERROR [org.springframework.boot.SpringApplication:839] Application startup failed
org.springframework.context.ApplicationContextException: Failed to start bean 'stompBrokerRelayMessageHandler'; nested exception is java.lang.NoClassDefFoundError: reactor/io/codec/Codec
EDIT3: 这是我向客户发送通知的方式:
@Component
public class MenuItemNotificationSender {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
public MenuItemNotificationSender(SimpMessagingTemplate messagingTemplate){
this.messagingTemplate = messagingTemplate;
}
public void sendNotification(MenuItemDto menuItem) {
messagingTemplate.convertAndSend("/topic/notification", menuItem);
}
}
如果您使用默认的 AMQ 配置,这是持久订阅者的默认行为,消息将被持久化, 如果您想在用户离线时也向他们发送通知,您需要使用持久订阅。
编辑
Persistent Messaging in STOMP STOMP messages are non-persistent by default. To use persistent messaging add the following STOMP header to all SEND requests: persistent:true. This default is the opposite of that for JMS messages.
要持久化已发送的消息,在 js 客户端上,您需要将 header 添加到此方法中:
stompClient.send(destination, {"persistent":"true" }, body);
像这样更新您的 MenuItemNotificationSender :
public void sendNotification(MenuItemDto menuItem) {
Map<String, Object> headers = new HashMap<>();
headers.put("JMSDeliveryMode", 2);
headers.put("persistent", "true");
messagingTemplate.convertAndSend("/topic/notification", menuItem, headers);
}
看看
http://activemq.apache.org/how-do-i-make-messages-durable.html
http://activemq.apache.org/how-do-durable-queues-and-topics-work.html
使用 stomp 进行持久订阅:
stompClient.connect( {"client-id": "my-client-id" },, function ( frame ) {
console.log( 'Connected: ' + frame );
stompClient.subscribe( topic, function ( message ) {
.....
.....
}, {"activemq.subscriptionName": "my-client-id"});
}, function(frame) {
console.log("Web socket disconnected");
});
更新
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{
@Bean(initMethod = "start", destroyMethod = "stop")
public BrokerService broker() throws Exception {
final BrokerService broker = new BrokerService();
//broker.addConnector("tcp://localhost:61616");
broker.addConnector("stomp://localhost:61613");
broker.addConnector("vm://localhost");
PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
File dir = new File(System.getProperty("user.home") + File.separator + "kaha");
if (!dir.exists()) {
dir.mkdirs();
}
persistenceAdapter.setDirectory(dir);
broker.setPersistenceAdapter(persistenceAdapter);
broker.setPersistent(true);
return broker;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// if AMQ is running in local not needed to set relayHost & relayPort
config.enableStompBrokerRelay("/topic/")
.setRelayHost(relayHost)
.setRelayPort(relayPort)
// user pwd if needed
//.setSystemLogin(activeMqLogin)
//.setSystemPasscode(activeMqPassword)
;
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/myWebSocketEndPoint")
.setAllowedOrigins("*")
.withSockJS();
}
}
使用parentpom
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.3.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-net</artifactId> </dependency> <dependency> <groupId>io.projectreactor.spring</groupId> <artifactId>reactor-spring-context</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-kahadb-store</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-stomp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>