如何访问 MongoDB 支持的 Spring 集成消息存储中存储的消息?
How to access stored messages in a MongoDB backed Spring Integration message store?
我实施了 Spring 集成 QueueChannel
,由 MongoDbChannelMessageStore
支持。消息生成和我使用消息的集成流程都按预期工作。
现在我正在尝试实现一种逻辑,用于列出和记录消息存储或队列通道中当前包含的所有消息。消息应与其 POJO 负载一起记录(以 toString()
格式)。不应通过列出来将消息从队列通道中删除。此逻辑应该在应用程序启动期间或按需调用。
这里有一些代码片段(我使用的是 Spring Boot 2.3.4)。
我的消息负载:
@Data
public class ExampleMessage implements Serializable {
private String id;
private Instant timestamp;
}
我的集成配置:
@SpringBootApplication
@EnableIntegration
@Slf4j
public class IntegrationApp {
private static final String GROUP_ID = "my-group";
// ... main method omitted
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory mongoDatabaseFactory) {
return new MongoDbChannelMessageStore(mongoDatabaseFactory, "message-store");
}
@Bean
public PollableChannel channel(MongoDbChannelMessageStore messageStore) {
MessageGroupQueue messageGroupQueue = new MessageGroupQueue(messageStore, GROUP_ID);
return new QueueChannel(messageGroupQueue);
}
@Bean
public IntegrationFlow integrationFlow(PollableChannel channel) {
return IntegrationFlows.from(channel)
.handle(message -> log.info("Message received: {}", message.getPayload()),
e -> e.poller(Pollers
.fixedRate(7000, 5000)
.maxMessagesPerPoll(1)
.taskExecutor(Executors.newSingleThreadExecutor())))
.get();
}
我的消息制作者:
@Component
@EnableScheduling
@Slf4j
public class ExampleMessageProducer {
@Autowired
private PollableChannel channel;
private final MessagingTemplate messagingTemplate = new MessagingTemplate();
@Scheduled(initialDelay = 1000, fixedDelay = 3000)
void produceMessage() {
ExampleMessage exampleMessage = new ExampleMessage();
messagingTemplate.send(channel, MessageBuilder.withPayload(exampleMessage).build());
log.info("Message sent: {}", exampleMessage);
}
}
从public API of Spring Integration messaging,针对该问题可推导出以下处理方法:
@Component
@Slf4j
public class EventListenerBean {
@Autowired
MongoDbChannelMessageStore messageStore;
@EventListener
public void onApplicationEvent(ContextRefreshedEvent event) {
Collection<Message<?>> messages = messageStore.getMessageGroup(GROUP_ID).getMessages();
log.info("# of messages in group: {}", messages.size());
messages.forEach(m -> log.info("Stored message: {}", m.getPayload()));
}
}
不幸的是,这种方法会导致消息集合或消息流为空(可能是由于消息存储实现中的某种延迟加载?)。
2020-11-03 17:05:42.398 INFO 4748 --- [ main] m.t.s.integration.EventListenerBean : # of messages in group: 0
2020-11-03 17:05:42.420 INFO 4748 --- [ main] m.t.spring.integration.IntegrationApp : Started IntegrationApp in 6.557 seconds (JVM running for 7.532)
2020-11-03 17:05:47.533 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=d803b591-2f47-412d-9d64-e8efb424f393, timestamp=2020-11-03T15:43:34.253162Z)
2020-11-03 17:05:48.345 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=7361b7d6-5c36-4801-851e-6d61dc18ebb2, timestamp=2020-11-03T15:43:36.259450500Z)
2020-11-03 17:05:49.348 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=7fe98beb-bd0b-4fda-a2c2-bbab2d80d1e9, timestamp=2020-11-03T15:43:38.265175Z)
2020-11-03 17:05:50.347 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=375d92b6-8746-4478-911a-85b34c2ec2ab, timestamp=2020-11-03T15:43:40.270516100Z)
我想避免直接查询 MongoDB message-store
集合,而更喜欢使用 Spring 集成 API.
有人能解决这个问题吗?提前谢谢你。
如果您对具体的MessageGroup
感兴趣,可以使用BasicMessageGroupStore
合同的getMessageGroup()
API。
无论哪种方式,在 bean 初始化阶段访问 low-level 资源(如数据库)都不好,就像您在 channel
bean 定义中访问日志一样。您必须推迟这样的操作,直到整个应用程序上下文准备就绪。或捕获 ContextRefreshedEvent
,或执行 SmartLifecycle.start()
合同。
更新
事实证明,您对 MongoDbChannelMessageStore
及其 API 的看法是正确的。我们绝对无法访问该合约的消息:
/**
* Not fully used. Only wraps the provided group id.
*/
@Override
public MessageGroup getMessageGroup(Object groupId) {
return getMessageGroupFactory().create(groupId);
}
所以,我们只是创建了一个新的空组,没有任何钩子与 MongoDB 集合中的任何内容。
作为一种变通方法,我建议您将常规 ConfigurableMongoDbMessageStore
作为 bean 针对同一个 message-store
集合。这个已经为我们提供了一个 API 来迭代组和他们的消息。因此,您将使用 MongoDbChannelMessageStore
对 QueueChannel
进行主动操作,并使用常规 ConfigurableMongoDbMessageStore
读取集合内容。
我们可能需要考虑在 ChannelMessageStore
中实施此 API。看起来没有什么坏处...请随时提出 GH 问题以获得此改进!
我做了一些调试。我看不出 MongoDbChannelMessageStore
的方法
public MessageGroup getMessageGroup(Object groupId) {
return this.getMessageGroupFactory().create(groupId);
}
(我在 EventListenerBean
中使用)可以产生与空 MessageGroup
.
不同的任何东西
我试图通过 MongoDbChannelMessageStore
的以下初始化来改变这种行为:
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory mongoDatabaseFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDatabaseFactory, "message-store");
SimpleMessageGroupFactory messageGroupFactory = new SimpleMessageGroupFactory(GroupType.PERSISTENT) {
@Override
public MessageGroup create(Object groupId) {
return this.create(store, groupId);
}
};
store.setLazyLoadMessageGroups(false);
store.setMessageGroupFactory(messageGroupFactory);
return store;
}
这是一个 hack,但在它最终出现在 AbstractConfigurableMongoDbMessageStore
的 getMessagesForGroup()
之前一直运行良好,不幸的是,它没有实现。
我实施了 Spring 集成 QueueChannel
,由 MongoDbChannelMessageStore
支持。消息生成和我使用消息的集成流程都按预期工作。
现在我正在尝试实现一种逻辑,用于列出和记录消息存储或队列通道中当前包含的所有消息。消息应与其 POJO 负载一起记录(以 toString()
格式)。不应通过列出来将消息从队列通道中删除。此逻辑应该在应用程序启动期间或按需调用。
这里有一些代码片段(我使用的是 Spring Boot 2.3.4)。
我的消息负载:
@Data
public class ExampleMessage implements Serializable {
private String id;
private Instant timestamp;
}
我的集成配置:
@SpringBootApplication
@EnableIntegration
@Slf4j
public class IntegrationApp {
private static final String GROUP_ID = "my-group";
// ... main method omitted
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory mongoDatabaseFactory) {
return new MongoDbChannelMessageStore(mongoDatabaseFactory, "message-store");
}
@Bean
public PollableChannel channel(MongoDbChannelMessageStore messageStore) {
MessageGroupQueue messageGroupQueue = new MessageGroupQueue(messageStore, GROUP_ID);
return new QueueChannel(messageGroupQueue);
}
@Bean
public IntegrationFlow integrationFlow(PollableChannel channel) {
return IntegrationFlows.from(channel)
.handle(message -> log.info("Message received: {}", message.getPayload()),
e -> e.poller(Pollers
.fixedRate(7000, 5000)
.maxMessagesPerPoll(1)
.taskExecutor(Executors.newSingleThreadExecutor())))
.get();
}
我的消息制作者:
@Component
@EnableScheduling
@Slf4j
public class ExampleMessageProducer {
@Autowired
private PollableChannel channel;
private final MessagingTemplate messagingTemplate = new MessagingTemplate();
@Scheduled(initialDelay = 1000, fixedDelay = 3000)
void produceMessage() {
ExampleMessage exampleMessage = new ExampleMessage();
messagingTemplate.send(channel, MessageBuilder.withPayload(exampleMessage).build());
log.info("Message sent: {}", exampleMessage);
}
}
从public API of Spring Integration messaging,针对该问题可推导出以下处理方法:
@Component
@Slf4j
public class EventListenerBean {
@Autowired
MongoDbChannelMessageStore messageStore;
@EventListener
public void onApplicationEvent(ContextRefreshedEvent event) {
Collection<Message<?>> messages = messageStore.getMessageGroup(GROUP_ID).getMessages();
log.info("# of messages in group: {}", messages.size());
messages.forEach(m -> log.info("Stored message: {}", m.getPayload()));
}
}
不幸的是,这种方法会导致消息集合或消息流为空(可能是由于消息存储实现中的某种延迟加载?)。
2020-11-03 17:05:42.398 INFO 4748 --- [ main] m.t.s.integration.EventListenerBean : # of messages in group: 0
2020-11-03 17:05:42.420 INFO 4748 --- [ main] m.t.spring.integration.IntegrationApp : Started IntegrationApp in 6.557 seconds (JVM running for 7.532)
2020-11-03 17:05:47.533 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=d803b591-2f47-412d-9d64-e8efb424f393, timestamp=2020-11-03T15:43:34.253162Z)
2020-11-03 17:05:48.345 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=7361b7d6-5c36-4801-851e-6d61dc18ebb2, timestamp=2020-11-03T15:43:36.259450500Z)
2020-11-03 17:05:49.348 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=7fe98beb-bd0b-4fda-a2c2-bbab2d80d1e9, timestamp=2020-11-03T15:43:38.265175Z)
2020-11-03 17:05:50.347 INFO 4748 --- [pool-1-thread-1] m.t.spring.integration.IntegrationApp : Message received: ExampleMessage(id=375d92b6-8746-4478-911a-85b34c2ec2ab, timestamp=2020-11-03T15:43:40.270516100Z)
我想避免直接查询 MongoDB message-store
集合,而更喜欢使用 Spring 集成 API.
有人能解决这个问题吗?提前谢谢你。
如果您对具体的MessageGroup
感兴趣,可以使用BasicMessageGroupStore
合同的getMessageGroup()
API。
无论哪种方式,在 bean 初始化阶段访问 low-level 资源(如数据库)都不好,就像您在 channel
bean 定义中访问日志一样。您必须推迟这样的操作,直到整个应用程序上下文准备就绪。或捕获 ContextRefreshedEvent
,或执行 SmartLifecycle.start()
合同。
更新
事实证明,您对 MongoDbChannelMessageStore
及其 API 的看法是正确的。我们绝对无法访问该合约的消息:
/**
* Not fully used. Only wraps the provided group id.
*/
@Override
public MessageGroup getMessageGroup(Object groupId) {
return getMessageGroupFactory().create(groupId);
}
所以,我们只是创建了一个新的空组,没有任何钩子与 MongoDB 集合中的任何内容。
作为一种变通方法,我建议您将常规 ConfigurableMongoDbMessageStore
作为 bean 针对同一个 message-store
集合。这个已经为我们提供了一个 API 来迭代组和他们的消息。因此,您将使用 MongoDbChannelMessageStore
对 QueueChannel
进行主动操作,并使用常规 ConfigurableMongoDbMessageStore
读取集合内容。
我们可能需要考虑在 ChannelMessageStore
中实施此 API。看起来没有什么坏处...请随时提出 GH 问题以获得此改进!
我做了一些调试。我看不出 MongoDbChannelMessageStore
的方法
public MessageGroup getMessageGroup(Object groupId) {
return this.getMessageGroupFactory().create(groupId);
}
(我在 EventListenerBean
中使用)可以产生与空 MessageGroup
.
我试图通过 MongoDbChannelMessageStore
的以下初始化来改变这种行为:
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory mongoDatabaseFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDatabaseFactory, "message-store");
SimpleMessageGroupFactory messageGroupFactory = new SimpleMessageGroupFactory(GroupType.PERSISTENT) {
@Override
public MessageGroup create(Object groupId) {
return this.create(store, groupId);
}
};
store.setLazyLoadMessageGroups(false);
store.setMessageGroupFactory(messageGroupFactory);
return store;
}
这是一个 hack,但在它最终出现在 AbstractConfigurableMongoDbMessageStore
的 getMessagesForGroup()
之前一直运行良好,不幸的是,它没有实现。