如果处理失败,如何将 STOMP 消息重新发送给消费者?

How to redeliver a STOMP message to the consumer in case of any processing failure?

高层架构

JMS (Producer/Consumer) <----> Artemis(STOMP) <----> Websocket-Broker-Relay-Service <----> STOMP-over-Websocket-client ( Producer/Consumer)

一些观察

  1. 在STOMP consumer中,使用client-individual ack-subscription,无论是NACK还是ACK,消息被Artemis丢弃。我希望将消息重新传递给相同或任何其他消费者。有办法实现吗?

  2. 在 JMS 消费者中,如果消费者在 Artemis 上收到消息时已关闭,则不会传递持久消息。我的期望是,一旦消费者服务再次恢复,就会传递持久消息。

class StompSessionHandlerImpl implements StompSessionHandler {
    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        session.setAutoReceipt(Boolean.FALSE);
        StompHeaders headers1 = new StompHeaders();
        headers1.setDestination("/queue/msg");
        headers1.add("durable-subscription-name", messagingUtil.getServiceSubscriptionChannel());
        headers1.add("Authorization", "Bearer ".concat(token));
        headers1.setAck("client-individual");
        session.subscribe(headers1, this);

    }

    @Override
    public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
        session.acknowledge(Objects.requireNonNull(headers.getMessageId()), false);
    }

    @Override
    public void handleTransportError(StompSession session, Throwable exception) {
        synchronized (StompSessionHandlerImpl.msgSenderLock) {
            if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
                initStompSession();
            }
        }
    }

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return COMessage.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        if (payload == null) return;
        COMessage msg = (COMessage) payload;
     try {
        stompMessagingService.handleReceivedMessages(msg);
        self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), true);
       } catch (Exception e) {
           self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), false);
       }

    }


    @PreDestroy
    public void cleanUp() {
        self.stompMessagingService.getStompSession().disconnect();
    }

}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
    @Bean
    public WebSocketStompClient stompClient() {
        WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
        List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
        SockJsClient sockJsClient = new SockJsClient(transports);
        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        converter.setObjectMapper(objectMapper);
        stompClient.setMessageConverter(converter);
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10000);
        scheduler.initialize();
        stompClient.setTaskScheduler(scheduler);
        stompClient.setDefaultHeartbeat(new long[]{20000, 20000});
        stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
        ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
        return stompClient;
    }
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private String host;

    private String password;

    private String user;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/queue", "/topic", "/exchange")
                .setRelayHost(host)
                .setClientLogin(user)
                .setClientPasscode(password)
                .setSystemHeartbeatSendInterval(20000)
                .setSystemLogin(user)
                .setSystemPasscode(password)
                .setUserDestinationBroadcast("/topic/unresolved-user")
                .setUserRegistryBroadcast("/topic/log-user-registry");
        config.setApplicationDestinationPrefixes("/device");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
        registry.setErrorHandler(new StompSubProtocolErrorHandler());
    }

    @Bean
    public DefaultSimpUserRegistry getDefaultSimpRegistry() {
        return new DefaultSimpUserRegistry();
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setMessageSizeLimit(Integer.MAX_VALUE);
        registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
        registry.setTimeToFirstMessage(300000);
        registry.setSendTimeLimit(300000);
        registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
            @Override
            public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
                return new EmaWebSocketHandlerDecorator(webSocketHandler);
            }
        });

    }

}
class ArtemisConfig extends ArtemisAutoConfiguration {

    @Bean("mqConnectionFactory")
    public ConnectionFactory senderActiveMQConnectionFactory() {

        ActiveMQConnectionFactory connectionFactory =
               new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
        connectionFactory.setUser(user);
        connectionFactory.setPassword(password);
        connectionFactory.setConnectionTTL(-1L);
        connectionFactory.setClientID(clientID);
        connectionFactory.setEnableSharedClientID(true);
        connectionFactory.setPreAcknowledge(Boolean.FALSE);
        return connectionFactory;
    }

    @Bean("mqCachingConnectionFactory")
    @Primary
    public ConnectionFactory cachingConnectionFactory() {
        return new CachingConnectionFactory(senderActiveMQConnectionFactory());
    }

    @Bean("jmsTemplate")
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setMessageConverter(jsonMessageConverter);
        jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
        jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
        jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
        return jmsTemplate;
    }

    @PreDestroy
    public void cleanUp() {
        if (connection.isStarted()) {
            try {
                connection.close();
            } catch (JMSException e) {
                log.error("Failed to close the JMS connection {0}", e);
            }
        }
    }

}

当使用 ActiveMQ Artemis 时,STOMP ACK frame tells the broker that the message has been consumed successfully so it should be removed from the queue. A STOMP NACK 帧会告诉代理该消息 成功使用,因此代理将丢弃它。 STOMP 规范未指定此处的确切行为。它只说:

NACK is the opposite of ACK. It is used to tell the server that the client did not consume the message. The server can then either send the message to a different client, discard it, or put it in a dead letter queue. The exact behavior is server specific.

NACK takes the same headers as ACK: id (REQUIRED) and transaction (OPTIONAL).

NACK applies either to one single message (if the subscription's ack mode is client-individual) or to all messages sent before and not yet ACK'ed or NACK'ed (if the subscription's ack mode is client).

如果您希望重新传递消息,您既不应该确认也不应拒绝消息,并且当消费者的连接关闭时,消息将被放回队列中以传递给另一个(或相同的)客户端。

我希望将来可以配置此行为。