MongoDbMessageStore 无法确定 IsNewStrategy 错误

MongoDbMessageStore Could not determine IsNewStrategy Error

当我尝试在 IntegrationFlowBuilder 上使用延迟选项时出现以下错误

Unsupported entity org.springframework.integration.mongodb.store.MongoDbMessageStore$MessageWrapper! Could not determine IsNewStrategy.

我查看了 https://jira.spring.io/browse/INT-3346 的历史记录,它似乎是相关的,但该问题似乎已在我使用的版本中得到修复(我看到 _id 字段)。

当我逐步执行代码时,看起来好像使用了通过我的 MongoConfig 映射的映射上下文(和 IsNewStrategy),而不是 MongoDbMessageStore 中似乎配置的内容。因此,它可以访问我所有域实体(那些用@Document 注释的实体)的实体信息,但不能访问 MessageWrapper。

我的配置中是否遗漏了什么?有没有办法强制注册 MessageWrapper 实体?

使用
Spring 集成 - 4.2.4
Spring 数据 Mongo - 1.8.2

集成流程

IntegrationFlows.from("channel.in").delay("group1", 
     s -> s.defaultDelay(3000).messageStore(messageStore))
  .handle(<somehandler>).get()

消息存储配置

@Bean
public MessageGroupStore messageStore(MongoDbFactory mongoDbFactory) {
    return new MongoDbMessageStore(mongoDbFactory, "messageStoreCollection")
}

Mongo 配置

@Configuration
@EnableMongoAuditing
public class MongoConfig extends AbstractMongoConfiguration {

    @Bean
    public MongoClient mongo() throws UnknownHostException {
        MongoClientOptions.Builder builder = new MongoClientOptions.build();
        MongoClientOptions options = builder.connectionsPerHost(100).build();
        return new MongoClient(new ServerAddress("localhost", 28017)), options);
    }

    protected String getDatabaseName() { return "myMongoDb"; }

    protected String getMappingBasePackage() { return "com.mycompany.myapp.entities"; }

    @Bean
    public MongoTemplate mongoTemplate(MongoClient mongo, MongoConverter mongoConverter) throws UnknownHostException {
        return new MongoTemplate(mongoDbFactory(), mongoConverter);
    }

    @Bean 
    public GridFstemplate gridFsTemplate() throws UnknownHostException {
        return new GridFsTemplate(mongoDbFactory(), mappingMongoConverter());
    }

    @Bean
    public LocalValidatorFactoryBean localValidatorFactoryBean() {
        return new LocalValidatorFactoryBean();
    }
}

堆栈跟踪

java.lang.IllegalArgumentException: Unsupported entity org.springframework.integration.mongodb.store.MongoDbMessageStore$MessageWrapper! Could not determine IsNewStrategy.
    at org.springframework.data.support.IsNewStrategyFactorySupport.getIsNewStrategy(IsNewStrategyFactorySupport.java:48) ~[spring-data-commons-1.11.2.RELEASE.jar:na]
    at org.springframework.data.auditing.IsNewAwareAuditingHandler.markAudited(IsNewAwareAuditingHandler.java:80) ~[spring-data-commons-1.11.2.RELEASE.jar:na]
    at org.springframework.data.mongodb.core.mapping.event.AuditingEventListener.onApplicationEvent(AuditingEventListener.java:54) ~[spring-data-mongodb-1.8.2.RELEASE.jar:na]
    at org.springframework.data.mongodb.core.mapping.event.AuditingEventListener.onApplicationEvent(AuditingEventListener.java:31) ~[spring-data-mongodb-1.8.2.RELEASE.jar:na]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:163) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:136) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:381) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:335) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.data.mongodb.core.MongoTemplate.maybeEmitEvent(MongoTemplate.java:1627) ~[spring-data-mongodb-1.8.2.RELEASE.jar:na]
    at org.springframework.data.mongodb.core.MongoTemplate.doInsert(MongoTemplate.java:799) ~[spring-data-mongodb-1.8.2.RELEASE.jar:na]
    at org.springframework.data.mongodb.core.MongoTemplate.insert(MongoTemplate.java:743) ~[spring-data-mongodb-1.8.2.RELEASE.jar:na]
    at org.springframework.integration.mongodb.store.MongoDbMessageStore.addMessageDocument(MongoDbMessageStore.java:220) ~[spring-integration-mongodb-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.mongodb.store.MongoDbMessageStore.addMessageToGroup(MongoDbMessageStore.java:302) ~[spring-integration-mongodb-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.handler.DelayHandler.releaseMessageAfterDelay(DelayHandler.java:334) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.handler.DelayHandler.handleRequestMessage(DelayHandler.java:267) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.4.RELEASE.jar:na]
    at org.springframework.integration.amqp.channel.AbstractSubscribableAmqpChannel$DispatchingMessageListener.onMessage(AbstractSubscribableAmqpChannel.java:181) ~[spring-integration-amqp-4.2.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:800) [spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:691) [spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=13=]1(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:183) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
    at sun.reflect.GeneratedMethodAccessor178.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_40]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_40]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) ~[spring-aop-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) [spring-aop-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) [spring-aop-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor.doWithRetry(RetryOperationsInterceptor.java:74) ~[spring-retry-1.1.2.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263) [spring-retry-1.1.2.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:168) [spring-retry-1.1.2.RELEASE.jar:na]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:98) [spring-retry-1.1.2.RELEASE.jar:na]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) [spring-aop-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at com.sun.proxy.$Proxy213.invokeListener(Unknown Source) [na:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1358) [spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661) [spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1102) [spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1086) [spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) [spring-rabbit-1.5.3.RELEASE.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_40]

嗯,看起来 Spring 数据 MongoDB(或通用)中的某些内容已更改。

请就此事提出 JIRA,我们会调查如何处理。

但目前的 Spring 数据看起来我们无法从 Spring 集成方面做任何事情。

同时考虑使用 ConfigurableMongoDbMessageStore 并为 Message:

注入带有一些自定义 ConvertermappingMongoConverter()
public static class MessageReadConverter implements Converter<DBObject, Message<?>> {

    @Override
    @SuppressWarnings("unchecked")
    public Message<?> convert(DBObject source) {
        return MessageBuilder.withPayload(source.get("payload")).copyHeaders((Map<String, ?>) source.get("headers")).build();
    }

}

...
@Bean
public CustomConversions customConversions() {
    return new CustomConversions(Collections.singletonList(new MessageReadConverter()));
}

至少这就像我使用 Spring 启动一样,其中 MappingMongoConverter 是由 MongoDataAutoConfiguration:

为我提供的
@Bean
@ConditionalOnMissingBean(MongoConverter.class)
public MappingMongoConverter mappingMongoConverter(MongoDbFactory factory,
        MongoMappingContext context, BeanFactory beanFactory) {
    DbRefResolver dbRefResolver = new DefaultDbRefResolver(factory);
    MappingMongoConverter mappingConverter = new MappingMongoConverter(dbRefResolver,
            context);
    try {
        mappingConverter
                .setCustomConversions(beanFactory.getBean(CustomConversions.class));
    }
    catch (NoSuchBeanDefinitionException ex) {
        // Ignore
    }
    return mappingConverter;
}