如何访问 MongoDB 支持的 Spring 集成消息存储中存储的消息?

How to access stored messages in a MongoDB backed Spring Integration message store?

我实施了 Spring 集成 QueueChannel,由 MongoDbChannelMessageStore 支持。消息生成和我使用消息的集成流程都按预期工作。

现在我正在尝试实现一种逻辑,用于列出和记录消息存储或队列通道中当前包含的所有消息。消息应与其 POJO 负载一起记录(以 toString() 格式)。不应通过列出来将消息从队列通道中删除。此逻辑应该在应用程序启动期间或按需调用。

这里有一些代码片段(我使用的是 Spring Boot 2.3.4)。

我的消息负载:

@Data
public class ExampleMessage implements Serializable {
    private String id;
    private Instant timestamp;
}

我的集成配置:

@SpringBootApplication
@EnableIntegration
@Slf4j
public class IntegrationApp {

    private static final String GROUP_ID = "my-group";

    // ... main method omitted

    @Bean
    public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory mongoDatabaseFactory) {
        return new MongoDbChannelMessageStore(mongoDatabaseFactory, "message-store");
    }

    @Bean
    public PollableChannel channel(MongoDbChannelMessageStore messageStore) {
        MessageGroupQueue messageGroupQueue = new MessageGroupQueue(messageStore, GROUP_ID);
        return new QueueChannel(messageGroupQueue);
    }

    @Bean
    public IntegrationFlow integrationFlow(PollableChannel channel) {
        return IntegrationFlows.from(channel)
                .handle(message -> log.info("Message received: {}", message.getPayload()),
                        e -> e.poller(Pollers
                            .fixedRate(7000, 5000)
                            .maxMessagesPerPoll(1)
                            .taskExecutor(Executors.newSingleThreadExecutor())))
                .get();
    }

我的消息制作者:

@Component
@EnableScheduling
@Slf4j
public class ExampleMessageProducer {

    @Autowired
    private PollableChannel channel;

    private final MessagingTemplate messagingTemplate = new MessagingTemplate();

    @Scheduled(initialDelay = 1000, fixedDelay = 3000)
    void produceMessage() {
        ExampleMessage exampleMessage = new ExampleMessage();
        messagingTemplate.send(channel, MessageBuilder.withPayload(exampleMessage).build());
        log.info("Message sent: {}", exampleMessage);
    }
}

从public API of Spring Integration messaging,针对该问题可推导出以下处理方法:

@Component
@Slf4j
public class EventListenerBean {

    @Autowired
    MongoDbChannelMessageStore messageStore;

    @EventListener
    public void onApplicationEvent(ContextRefreshedEvent event) {

        Collection<Message<?>> messages = messageStore.getMessageGroup(GROUP_ID).getMessages();
        log.info("# of messages in group: {}", messages.size());
        messages.forEach(m -> log.info("Stored message: {}", m.getPayload()));
    }
}

不幸的是,这种方法会导致消息集合或消息流为空(可能是由于消息存储实现中的某种延迟加载?)。

2020-11-03 17:05:42.398  INFO 4748 --- [           main] m.t.s.integration.EventListenerBean      : # of messages in group: 0
2020-11-03 17:05:42.420  INFO 4748 --- [           main] m.t.spring.integration.IntegrationApp    : Started IntegrationApp in 6.557 seconds (JVM running for 7.532)
2020-11-03 17:05:47.533  INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp    : Message received: ExampleMessage(id=d803b591-2f47-412d-9d64-e8efb424f393, timestamp=2020-11-03T15:43:34.253162Z)
2020-11-03 17:05:48.345  INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp    : Message received: ExampleMessage(id=7361b7d6-5c36-4801-851e-6d61dc18ebb2, timestamp=2020-11-03T15:43:36.259450500Z)
2020-11-03 17:05:49.348  INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp    : Message received: ExampleMessage(id=7fe98beb-bd0b-4fda-a2c2-bbab2d80d1e9, timestamp=2020-11-03T15:43:38.265175Z)
2020-11-03 17:05:50.347  INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp    : Message received: ExampleMessage(id=375d92b6-8746-4478-911a-85b34c2ec2ab, timestamp=2020-11-03T15:43:40.270516100Z)

我想避免直接查询 MongoDB message-store 集合,而更喜欢使用 Spring 集成 API.

有人能解决这个问题吗?提前谢谢你。

如果您对具体的MessageGroup感兴趣,可以使用BasicMessageGroupStore合同的getMessageGroup()API。

无论哪种方式,在 bean 初始化阶段访问 low-level 资源(如数据库)都不好,就像您在 channel bean 定义中访问日志一样。您必须推迟这样的操作,直到整个应用程序上下文准备就绪。或捕获 ContextRefreshedEvent,或执行 SmartLifecycle.start() 合同。

更新

事实证明,您对 MongoDbChannelMessageStore 及其 API 的看法是正确的。我们绝对无法访问该合约的消息:

/**
 * Not fully used. Only wraps the provided group id.
 */
@Override
public MessageGroup getMessageGroup(Object groupId) {
    return getMessageGroupFactory().create(groupId);
}

所以,我们只是创建了一个新的空组,没有任何钩子与 MongoDB 集合中的任何内容。

作为一种变通方法,我建议您将常规 ConfigurableMongoDbMessageStore 作为 bean 针对同一个 message-store 集合。这个已经为我们提供了一个 API 来迭代组和他们的消息。因此,您将使用 MongoDbChannelMessageStoreQueueChannel 进行主动操作,并使用常规 ConfigurableMongoDbMessageStore 读取集合内容。

我们可能需要考虑在 ChannelMessageStore 中实施此 API。看起来没有什么坏处...请随时提出 GH 问题以获得此改进!

我做了一些调试。我看不出 MongoDbChannelMessageStore 的方法

public MessageGroup getMessageGroup(Object groupId) {
    return this.getMessageGroupFactory().create(groupId);
}

(我在 EventListenerBean 中使用)可以产生与空 MessageGroup.

不同的任何东西

我试图通过 MongoDbChannelMessageStore 的以下初始化来改变这种行为:

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory mongoDatabaseFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDatabaseFactory, "message-store");
    SimpleMessageGroupFactory messageGroupFactory = new SimpleMessageGroupFactory(GroupType.PERSISTENT) {
        @Override
        public MessageGroup create(Object groupId) {
            return this.create(store, groupId);
        }
    };

    store.setLazyLoadMessageGroups(false);
    store.setMessageGroupFactory(messageGroupFactory);
    return store;
}

这是一个 hack,但在它最终出现在 AbstractConfigurableMongoDbMessageStoregetMessagesForGroup() 之前一直运行良好,不幸的是,它没有实现。