Spring集成和MongoDB如何实现消息队列?
How to implement message queue with Spring Integration and MongoDB?
如何配置 Spring 集成以便从集合中删除已处理的消息。在 MongoDB 控制台中,我可以简单地调用:
db.messages.findAndModify({ remove:true })
但在 MongoDbMessageSource 中仅读取消息
mongoTemplate.find(..)
我猜它可以通过在事务中删除一些来完成。但是我无法找到简单的好解决方案。
我配置的入站部分:
@Bean
@Autowired
public IntegrationFlow pollMessages(MongoDbFactory mongoDbFactory, SomeService someService) {
return IntegrationFlows.from(
mongoMessageSource(mongoDbFactory),
c -> c.poller(Pollers.fixedDelay(1, TimeUnit.SECONDS)))
.handle(someService, "process")
.get();
}
@Bean
@Autowired
public MongoDbMessageSource mongoMessageSource(MongoDbFactory mongo) {
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
messageSource.setEntityClass(MessageEntity.class);
messageSource.setCollectionNameExpression(new LiteralExpression("messages"));
return messageSource;
}
没错。要达到这样的要求,您需要查看:
/**
* Specify the {@link TransactionSynchronizationFactory} to attach a
* {@link org.springframework.transaction.support.TransactionSynchronization}
* to the transaction around {@code poll} operation.
* @param transactionSynchronizationFactory the TransactionSynchronizationFactory to use.
* @return the spec.
*/
public PollerSpec transactionSynchronizationFactory(
TransactionSynchronizationFactory transactionSynchronizationFactory) {
并真正从 TransactionSynchronizationProcessor.processAfterCommit()
上的集合中删除。
有关详细信息,请参阅 Reference Manual。
对于 XML 配置,我们有这个测试用例:
<int-mongodb:inbound-channel-adapter id="inboundAdapterWithOnSuccessDisposition"
channel="replyChannel"
query="{'name' : 'Bob'}"
auto-startup="false">
<int:poller fixed-delay="200" max-messages-per-poll="1">
<int:advice-chain synchronization-factory="syncFactory">
<bean
class="org.springframework.integration.mongodb.config.MongoDbInboundChannelAdapterIntegrationTests.TestMessageSourceAdvice" />
<tx:advice>
<tx:attributes>
<tx:method name="*" />
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:before-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager" />
类似的事情也可以用 Java DSL 完成。
你需要DefaultTransactionSynchronizationFactory
和ExpressionEvaluatingTransactionSynchronizationProcessor
来配置这件事。对,一样可以用PseudoTransactionManager
虽然你也可以考虑在流程结束时手动调用remove/update
。
如何配置 Spring 集成以便从集合中删除已处理的消息。在 MongoDB 控制台中,我可以简单地调用:
db.messages.findAndModify({ remove:true })
但在 MongoDbMessageSource 中仅读取消息
mongoTemplate.find(..)
我猜它可以通过在事务中删除一些来完成。但是我无法找到简单的好解决方案。
我配置的入站部分:
@Bean
@Autowired
public IntegrationFlow pollMessages(MongoDbFactory mongoDbFactory, SomeService someService) {
return IntegrationFlows.from(
mongoMessageSource(mongoDbFactory),
c -> c.poller(Pollers.fixedDelay(1, TimeUnit.SECONDS)))
.handle(someService, "process")
.get();
}
@Bean
@Autowired
public MongoDbMessageSource mongoMessageSource(MongoDbFactory mongo) {
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
messageSource.setEntityClass(MessageEntity.class);
messageSource.setCollectionNameExpression(new LiteralExpression("messages"));
return messageSource;
}
没错。要达到这样的要求,您需要查看:
/**
* Specify the {@link TransactionSynchronizationFactory} to attach a
* {@link org.springframework.transaction.support.TransactionSynchronization}
* to the transaction around {@code poll} operation.
* @param transactionSynchronizationFactory the TransactionSynchronizationFactory to use.
* @return the spec.
*/
public PollerSpec transactionSynchronizationFactory(
TransactionSynchronizationFactory transactionSynchronizationFactory) {
并真正从 TransactionSynchronizationProcessor.processAfterCommit()
上的集合中删除。
有关详细信息,请参阅 Reference Manual。
对于 XML 配置,我们有这个测试用例:
<int-mongodb:inbound-channel-adapter id="inboundAdapterWithOnSuccessDisposition"
channel="replyChannel"
query="{'name' : 'Bob'}"
auto-startup="false">
<int:poller fixed-delay="200" max-messages-per-poll="1">
<int:advice-chain synchronization-factory="syncFactory">
<bean
class="org.springframework.integration.mongodb.config.MongoDbInboundChannelAdapterIntegrationTests.TestMessageSourceAdvice" />
<tx:advice>
<tx:attributes>
<tx:method name="*" />
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:before-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager" />
类似的事情也可以用 Java DSL 完成。
你需要DefaultTransactionSynchronizationFactory
和ExpressionEvaluatingTransactionSynchronizationProcessor
来配置这件事。对,一样可以用PseudoTransactionManager
虽然你也可以考虑在流程结束时手动调用remove/update
。