如果处理失败,如何将 STOMP 消息重新发送给消费者?
How to redeliver a STOMP message to the consumer in case of any processing failure?
高层架构
JMS (Producer/Consumer) <----> Artemis(STOMP) <----> Websocket-Broker-Relay-Service <----> STOMP-over-Websocket-client ( Producer/Consumer)
一些观察
在STOMP consumer中,使用client-individual ack-subscription,无论是NACK还是ACK,消息被Artemis丢弃。我希望将消息重新传递给相同或任何其他消费者。有办法实现吗?
在 JMS 消费者中,如果消费者在 Artemis 上收到消息时已关闭,则不会传递持久消息。我的期望是,一旦消费者服务再次恢复,就会传递持久消息。
class StompSessionHandlerImpl implements StompSessionHandler {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
session.setAutoReceipt(Boolean.FALSE);
StompHeaders headers1 = new StompHeaders();
headers1.setDestination("/queue/msg");
headers1.add("durable-subscription-name", messagingUtil.getServiceSubscriptionChannel());
headers1.add("Authorization", "Bearer ".concat(token));
headers1.setAck("client-individual");
session.subscribe(headers1, this);
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
session.acknowledge(Objects.requireNonNull(headers.getMessageId()), false);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
synchronized (StompSessionHandlerImpl.msgSenderLock) {
if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
initStompSession();
}
}
}
@Override
public Type getPayloadType(StompHeaders headers) {
return COMessage.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
if (payload == null) return;
COMessage msg = (COMessage) payload;
try {
stompMessagingService.handleReceivedMessages(msg);
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), true);
} catch (Exception e) {
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), false);
}
}
@PreDestroy
public void cleanUp() {
self.stompMessagingService.getStompSession().disconnect();
}
}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
@Bean
public WebSocketStompClient stompClient() {
WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
converter.setObjectMapper(objectMapper);
stompClient.setMessageConverter(converter);
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10000);
scheduler.initialize();
stompClient.setTaskScheduler(scheduler);
stompClient.setDefaultHeartbeat(new long[]{20000, 20000});
stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
return stompClient;
}
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private String host;
private String password;
private String user;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/queue", "/topic", "/exchange")
.setRelayHost(host)
.setClientLogin(user)
.setClientPasscode(password)
.setSystemHeartbeatSendInterval(20000)
.setSystemLogin(user)
.setSystemPasscode(password)
.setUserDestinationBroadcast("/topic/unresolved-user")
.setUserRegistryBroadcast("/topic/log-user-registry");
config.setApplicationDestinationPrefixes("/device");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
registry.setErrorHandler(new StompSubProtocolErrorHandler());
}
@Bean
public DefaultSimpUserRegistry getDefaultSimpRegistry() {
return new DefaultSimpUserRegistry();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(Integer.MAX_VALUE);
registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
registry.setTimeToFirstMessage(300000);
registry.setSendTimeLimit(300000);
registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
@Override
public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
return new EmaWebSocketHandlerDecorator(webSocketHandler);
}
});
}
}
class ArtemisConfig extends ArtemisAutoConfiguration {
@Bean("mqConnectionFactory")
public ConnectionFactory senderActiveMQConnectionFactory() {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
connectionFactory.setUser(user);
connectionFactory.setPassword(password);
connectionFactory.setConnectionTTL(-1L);
connectionFactory.setClientID(clientID);
connectionFactory.setEnableSharedClientID(true);
connectionFactory.setPreAcknowledge(Boolean.FALSE);
return connectionFactory;
}
@Bean("mqCachingConnectionFactory")
@Primary
public ConnectionFactory cachingConnectionFactory() {
return new CachingConnectionFactory(senderActiveMQConnectionFactory());
}
@Bean("jmsTemplate")
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setMessageConverter(jsonMessageConverter);
jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
return jmsTemplate;
}
@PreDestroy
public void cleanUp() {
if (connection.isStarted()) {
try {
connection.close();
} catch (JMSException e) {
log.error("Failed to close the JMS connection {0}", e);
}
}
}
}
当使用 ActiveMQ Artemis 时,STOMP ACK
frame tells the broker that the message has been consumed successfully so it should be removed from the queue. A STOMP NACK
帧会告诉代理该消息 未 成功使用,因此代理将丢弃它。 STOMP 规范未指定此处的确切行为。它只说:
NACK
is the opposite of ACK
. It is used to tell the server that the client did not consume the message. The server can then either send the message to a different client, discard it, or put it in a dead letter queue. The exact behavior is server specific.
NACK
takes the same headers as ACK
: id
(REQUIRED) and transaction
(OPTIONAL).
NACK applies either to one single message (if the subscription's ack mode is client-individual
) or to all messages sent before and not yet ACK'ed or NACK'ed (if the subscription's ack mode is client
).
如果您希望重新传递消息,您既不应该确认也不应拒绝消息,并且当消费者的连接关闭时,消息将被放回队列中以传递给另一个(或相同的)客户端。
我希望将来可以配置此行为。
高层架构
JMS (Producer/Consumer) <----> Artemis(STOMP) <----> Websocket-Broker-Relay-Service <----> STOMP-over-Websocket-client ( Producer/Consumer)
一些观察
在STOMP consumer中,使用client-individual ack-subscription,无论是NACK还是ACK,消息被Artemis丢弃。我希望将消息重新传递给相同或任何其他消费者。有办法实现吗?
在 JMS 消费者中,如果消费者在 Artemis 上收到消息时已关闭,则不会传递持久消息。我的期望是,一旦消费者服务再次恢复,就会传递持久消息。
class StompSessionHandlerImpl implements StompSessionHandler {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
session.setAutoReceipt(Boolean.FALSE);
StompHeaders headers1 = new StompHeaders();
headers1.setDestination("/queue/msg");
headers1.add("durable-subscription-name", messagingUtil.getServiceSubscriptionChannel());
headers1.add("Authorization", "Bearer ".concat(token));
headers1.setAck("client-individual");
session.subscribe(headers1, this);
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
session.acknowledge(Objects.requireNonNull(headers.getMessageId()), false);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
synchronized (StompSessionHandlerImpl.msgSenderLock) {
if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
initStompSession();
}
}
}
@Override
public Type getPayloadType(StompHeaders headers) {
return COMessage.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
if (payload == null) return;
COMessage msg = (COMessage) payload;
try {
stompMessagingService.handleReceivedMessages(msg);
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), true);
} catch (Exception e) {
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), false);
}
}
@PreDestroy
public void cleanUp() {
self.stompMessagingService.getStompSession().disconnect();
}
}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
@Bean
public WebSocketStompClient stompClient() {
WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
converter.setObjectMapper(objectMapper);
stompClient.setMessageConverter(converter);
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10000);
scheduler.initialize();
stompClient.setTaskScheduler(scheduler);
stompClient.setDefaultHeartbeat(new long[]{20000, 20000});
stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
return stompClient;
}
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private String host;
private String password;
private String user;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/queue", "/topic", "/exchange")
.setRelayHost(host)
.setClientLogin(user)
.setClientPasscode(password)
.setSystemHeartbeatSendInterval(20000)
.setSystemLogin(user)
.setSystemPasscode(password)
.setUserDestinationBroadcast("/topic/unresolved-user")
.setUserRegistryBroadcast("/topic/log-user-registry");
config.setApplicationDestinationPrefixes("/device");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
registry.setErrorHandler(new StompSubProtocolErrorHandler());
}
@Bean
public DefaultSimpUserRegistry getDefaultSimpRegistry() {
return new DefaultSimpUserRegistry();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(Integer.MAX_VALUE);
registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
registry.setTimeToFirstMessage(300000);
registry.setSendTimeLimit(300000);
registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
@Override
public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
return new EmaWebSocketHandlerDecorator(webSocketHandler);
}
});
}
}
class ArtemisConfig extends ArtemisAutoConfiguration {
@Bean("mqConnectionFactory")
public ConnectionFactory senderActiveMQConnectionFactory() {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
connectionFactory.setUser(user);
connectionFactory.setPassword(password);
connectionFactory.setConnectionTTL(-1L);
connectionFactory.setClientID(clientID);
connectionFactory.setEnableSharedClientID(true);
connectionFactory.setPreAcknowledge(Boolean.FALSE);
return connectionFactory;
}
@Bean("mqCachingConnectionFactory")
@Primary
public ConnectionFactory cachingConnectionFactory() {
return new CachingConnectionFactory(senderActiveMQConnectionFactory());
}
@Bean("jmsTemplate")
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setMessageConverter(jsonMessageConverter);
jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
return jmsTemplate;
}
@PreDestroy
public void cleanUp() {
if (connection.isStarted()) {
try {
connection.close();
} catch (JMSException e) {
log.error("Failed to close the JMS connection {0}", e);
}
}
}
}
当使用 ActiveMQ Artemis 时,STOMP ACK
frame tells the broker that the message has been consumed successfully so it should be removed from the queue. A STOMP NACK
帧会告诉代理该消息 未 成功使用,因此代理将丢弃它。 STOMP 规范未指定此处的确切行为。它只说:
NACK
is the opposite ofACK
. It is used to tell the server that the client did not consume the message. The server can then either send the message to a different client, discard it, or put it in a dead letter queue. The exact behavior is server specific.
NACK
takes the same headers asACK
:id
(REQUIRED) andtransaction
(OPTIONAL).NACK applies either to one single message (if the subscription's ack mode is
client-individual
) or to all messages sent before and not yet ACK'ed or NACK'ed (if the subscription's ack mode isclient
).
如果您希望重新传递消息,您既不应该确认也不应拒绝消息,并且当消费者的连接关闭时,消息将被放回队列中以传递给另一个(或相同的)客户端。
我希望将来可以配置此行为。