ActiveMQ Spring Stomp:我如何更改现有代码以创建持久订阅

ActiveMQ Spring Stomp: how can i change my existing code to create persistent subscription

我在我的项目中创建了一个正在运行的通知系统。我的实际代码是:

我的客户(javascript):

let connectWebSocket = () => {
  socket = new SockJS(context.backend + '/myWebSocketEndPoint');
  stompClient = Stomp.over(socket);
  stompClient.connect({},function (frame) {
    stompClient.subscribe('/topic/notification', function(response){
      alert(response);
    });
  });
}
connectWebSocket();

服务器(Java 和 Spring)

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableSimpleBroker("/topic");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/myWebSocketEndPoint")
            .setAllowedOrigins("*")
            .withSockJS();
    }
}

这是有效的。现在我也想在用户离线时向他们发送通知:当他们登录时,我会(自动)向他们发送通知。我必须用 activeMQ 来做这件事。我看过一些例子,但不太理解它们。有人可以告诉我如何准确编辑我的代码并实现持久订阅吗?非常感谢

编辑:我更新了我的客户端代码:

let connectWebSocket = () => {
  let clientId =user.profile.id;
  socket = new SockJS(context.backend + '/myWebSocketEndPoint');
  stompClient = Stomp.over(socket);
  stompClient.connect({"client-id": clientId},{},function (frame) {
    stompClient.subscribe('/topic/notification', function(response){
      alert(response);
    },{"activemq.subscriptionName": clientId});
  });
}

但是当用户离线时,如果通知到达,当他 returns 在线时,通知不会发送给他..我想我必须改变我的服务器端

POM.xml

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.14.2</version>
</dependency>

编辑 2:: 有了 pom.xml 中的正确依赖项,我现在有一个错误。我有这样的配置:

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
   config.enableStompBrokerRelay("/topic/");
}

但是当我 运行 我的代码时,我看到了这个错误:

2017/01/24 17:17:15.751 ERROR [org.springframework.boot.SpringApplication:839] Application startup failed
org.springframework.context.ApplicationContextException: Failed to start bean 'stompBrokerRelayMessageHandler'; nested exception is java.lang.NoClassDefFoundError: reactor/io/codec/Codec

EDIT3: 这是我向客户发送通知的方式:

@Component
public class MenuItemNotificationSender {

@Autowired
private SimpMessagingTemplate messagingTemplate;

@Autowired
public MenuItemNotificationSender(SimpMessagingTemplate messagingTemplate){
    this.messagingTemplate = messagingTemplate;
}

public void sendNotification(MenuItemDto menuItem) {
    messagingTemplate.convertAndSend("/topic/notification", menuItem);
}
}

如果您使用默认的 AMQ 配置,这是持久订阅者的默认行为,消息将被持久化, 如果您想在用户离线时也向他们发送通知,您需要使用持久订阅。

编辑

Persistent Messaging in STOMP STOMP messages are non-persistent by default. To use persistent messaging add the following STOMP header to all SEND requests: persistent:true. This default is the opposite of that for JMS messages.

要持久化已发送的消息,在 js 客户端上,您需要将 header 添加到此方法中:

stompClient.send(destination,  {"persistent":"true" }, body);

像这样更新您的 MenuItemNotificationSender :

public void sendNotification(MenuItemDto menuItem) {
    Map<String, Object> headers = new HashMap<>();
    headers.put("JMSDeliveryMode", 2);
    headers.put("persistent", "true");
    messagingTemplate.convertAndSend("/topic/notification", menuItem, headers);
}

看看

http://activemq.apache.org/how-do-i-make-messages-durable.html

http://activemq.apache.org/how-do-durable-queues-and-topics-work.html

使用 stomp 进行持久订阅:

    stompClient.connect( {"client-id": "my-client-id" },, function ( frame ) {

      console.log( 'Connected: ' + frame );

      stompClient.subscribe( topic, function ( message ) {
        .....
        .....
      }, {"activemq.subscriptionName": "my-client-id"});
   }, function(frame) {
        console.log("Web socket disconnected");
   });

更新

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{



@Bean(initMethod = "start", destroyMethod = "stop")
public BrokerService broker() throws Exception {
    final BrokerService broker = new BrokerService();
    //broker.addConnector("tcp://localhost:61616");
    broker.addConnector("stomp://localhost:61613");
    broker.addConnector("vm://localhost");
    PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
    File dir = new File(System.getProperty("user.home") + File.separator + "kaha");
    if (!dir.exists()) {
        dir.mkdirs();
    }
    persistenceAdapter.setDirectory(dir);
    broker.setPersistenceAdapter(persistenceAdapter);
    broker.setPersistent(true);
    return broker;
}

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    // if AMQ is running in local not needed to set relayHost & relayPort
    config.enableStompBrokerRelay("/topic/")
   .setRelayHost(relayHost)
   .setRelayPort(relayPort)
   // user pwd if needed
   //.setSystemLogin(activeMqLogin)
   //.setSystemPasscode(activeMqPassword)
   ;
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/myWebSocketEndPoint")
            .setAllowedOrigins("*")
            .withSockJS();
    }
}

使用parentpom

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>1.4.3.RELEASE</version>
  <relativePath /> <!-- lookup parent from repository -->
</parent>





<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-net</artifactId>
</dependency>
<dependency>
  <groupId>io.projectreactor.spring</groupId>
  <artifactId>reactor-spring-context</artifactId>
</dependency>

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-kahadb-store</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-stomp</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>