Axon:如何为单个事件配置 amqp 发布?
Axon: How to configure amqp publishing for single events?
我有一个简单的 spring 驱动的服务,它通过 amqp 发布事件。
配置基于bootiful-axon.
现在我希望服务保持一些私有状态。这是一个简单的用例,可以通过 3 个额外的事件来实现。这些事件在服务范围之外没有任何意义,所以我不希望它们 "leave".
如何指定哪些事件应该通过 amqp 发布,哪些不应该发布?
我是这样解决的:
拦截send
方法的自定义SpringAMQPPublisher
:
public class SelectiveAmqpPublisher extends SpringAMQPPublisher {
static boolean shouldSend (Class<?> pt) {
return PublicEvent.class.isAssignableFrom(pt);
}
public SelectiveAmqpPublisher (
SubscribableMessageSource<EventMessage<?>> messageSource) {
super(messageSource);
}
@Override
protected void send (List<? extends EventMessage<?>> events) {
super.send(events.stream()
.filter(e -> shouldSend(e.getPayloadType()))
.collect(Collectors.toList()));
}
}
配置:
@Autowired
private AMQPProperties amqpProperties;
@Autowired
private RoutingKeyResolver routingKeyResolver;
@Autowired
private AMQPMessageConverter aMQPMessageConverter;
@Bean(initMethod = "start", destroyMethod = "shutDown")
public SpringAMQPPublisher amqpBridge(
EventBus eventBus,
ConnectionFactory connectionFactory,
AMQPMessageConverter amqpMessageConverter) {
SpringAMQPPublisher publisher = new SelectiveAmqpPublisher(eventBus);
// The rest is from axon-spring-autoconfigure...
publisher.setExchangeName(amqpProperties.getExchange());
publisher.setConnectionFactory(connectionFactory);
publisher.setMessageConverter(amqpMessageConverter);
switch (amqpProperties.getTransactionMode()) {
case TRANSACTIONAL:
publisher.setTransactional(true);
break;
case PUBLISHER_ACK:
publisher.setWaitForPublisherAck(true);
break;
case NONE:
break;
default:
throw new IllegalStateException("....");
}
return publisher;
}
我有一个简单的 spring 驱动的服务,它通过 amqp 发布事件。 配置基于bootiful-axon.
现在我希望服务保持一些私有状态。这是一个简单的用例,可以通过 3 个额外的事件来实现。这些事件在服务范围之外没有任何意义,所以我不希望它们 "leave".
如何指定哪些事件应该通过 amqp 发布,哪些不应该发布?
我是这样解决的:
拦截send
方法的自定义SpringAMQPPublisher
:
public class SelectiveAmqpPublisher extends SpringAMQPPublisher {
static boolean shouldSend (Class<?> pt) {
return PublicEvent.class.isAssignableFrom(pt);
}
public SelectiveAmqpPublisher (
SubscribableMessageSource<EventMessage<?>> messageSource) {
super(messageSource);
}
@Override
protected void send (List<? extends EventMessage<?>> events) {
super.send(events.stream()
.filter(e -> shouldSend(e.getPayloadType()))
.collect(Collectors.toList()));
}
}
配置:
@Autowired
private AMQPProperties amqpProperties;
@Autowired
private RoutingKeyResolver routingKeyResolver;
@Autowired
private AMQPMessageConverter aMQPMessageConverter;
@Bean(initMethod = "start", destroyMethod = "shutDown")
public SpringAMQPPublisher amqpBridge(
EventBus eventBus,
ConnectionFactory connectionFactory,
AMQPMessageConverter amqpMessageConverter) {
SpringAMQPPublisher publisher = new SelectiveAmqpPublisher(eventBus);
// The rest is from axon-spring-autoconfigure...
publisher.setExchangeName(amqpProperties.getExchange());
publisher.setConnectionFactory(connectionFactory);
publisher.setMessageConverter(amqpMessageConverter);
switch (amqpProperties.getTransactionMode()) {
case TRANSACTIONAL:
publisher.setTransactional(true);
break;
case PUBLISHER_ACK:
publisher.setWaitForPublisherAck(true);
break;
case NONE:
break;
default:
throw new IllegalStateException("....");
}
return publisher;
}