Spring集成和MongoDB如何实现消息队列?

How to implement message queue with Spring Integration and MongoDB?

如何配置 Spring 集成以便从集合中删除已处理的消息。在 MongoDB 控制台中,我可以简单地调用:

db.messages.findAndModify({ remove:true })

但在 MongoDbMessageSource 中仅读取消息

mongoTemplate.find(..)

我猜它可以通过在事务中删除一些来完成。但是我无法找到简单的好解决方案。

我配置的入站部分:

@Bean
@Autowired
public IntegrationFlow pollMessages(MongoDbFactory mongoDbFactory, SomeService someService) {
    return IntegrationFlows.from(
            mongoMessageSource(mongoDbFactory),
            c -> c.poller(Pollers.fixedDelay(1, TimeUnit.SECONDS)))
            .handle(someService, "process")
            .get();
}

@Bean
@Autowired
public MongoDbMessageSource mongoMessageSource(MongoDbFactory mongo) {
    MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
    messageSource.setEntityClass(MessageEntity.class);
    messageSource.setCollectionNameExpression(new LiteralExpression("messages"));

    return messageSource;
}

没错。要达到这样的要求,您需要查看:

/**
 * Specify the {@link TransactionSynchronizationFactory} to attach a
 * {@link org.springframework.transaction.support.TransactionSynchronization}
 * to the transaction around {@code poll} operation.
 * @param transactionSynchronizationFactory the TransactionSynchronizationFactory to use.
 * @return the spec.
 */
public PollerSpec transactionSynchronizationFactory(
        TransactionSynchronizationFactory transactionSynchronizationFactory) {

并真正从 TransactionSynchronizationProcessor.processAfterCommit() 上的集合中删除。

有关详细信息,请参阅 Reference Manual

对于 XML 配置,我们有这个测试用例:

<int-mongodb:inbound-channel-adapter id="inboundAdapterWithOnSuccessDisposition"
                                     channel="replyChannel"
                                     query="{'name' : 'Bob'}"
                                     auto-startup="false">

    <int:poller fixed-delay="200" max-messages-per-poll="1">
        <int:advice-chain  synchronization-factory="syncFactory">
            <bean
                    class="org.springframework.integration.mongodb.config.MongoDbInboundChannelAdapterIntegrationTests.TestMessageSourceAdvice" />
            <tx:advice>
                <tx:attributes>
                    <tx:method name="*" />
                </tx:attributes>
            </tx:advice>
        </int:advice-chain>
    </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:before-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager" />

类似的事情也可以用 Java DSL 完成。

你需要DefaultTransactionSynchronizationFactoryExpressionEvaluatingTransactionSynchronizationProcessor来配置这件事。对,一样可以用PseudoTransactionManager

虽然你也可以考虑在流程结束时手动调用remove/update