Spring 集成推荐的使用基于键的查询丰富事件的方法

Spring Integration recommended way of enriching events with key based queries

我想从 Kafka 中读取数据,并且在每个事件中,使用 Id 和 Kafka 事件中的另一个字段从 MongoDB 中读取。我想知道通常推荐的方法是什么,以及是否可以使用 ReactiveMongoDbMessageSource 来做到这一点。我想也许正确的运算符是 .gateway().enrich() 但我真的不确定。我真的不知道如何将它与消息源一起使用,所以我不确定它是否可能。我希望能够写出这样的东西:

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                        .map(GenericMessage::new))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .gateway((message) -> enrichMongoDbPayloadByMessageKey(message.getHeaders().getId())
            .handle(new ReactiveElasticsearchMessageHandler());
    }

我真的很想看看我需要的模拟实现的示例 enrichMongoDbPayloadByMessageKey()

gateway()enricher() 是正确的方向,具体取决于您的要求,如果您希望仅使用 MongoDb 请求的结果继续流程,或者您希望向 transform().

的结果添加更多数据

ReactiveMongoDbMessageSource 在这里是一个错误的方向,因为它被用作消息的来源 - 流的开始。在您的情况下,它实际上是一个基于从 Kafka 收到的结果的服务激活器。

没有(还)反应式 MongoDb 网关(request-reply 通道适配器),但最接近的 out-of-the-box 解决方案是 MongoDbOutboundGatewayhttps://docs.spring.io/spring-integration/docs/current/reference/html/mongodb.html#mongodb-outbound-gateway .

如果您真的想在这里处理反应式解决方案,请考虑实现将接收您的参数的服务方法,对 MongoDB 和 return 执行反应式操作。看到那个目标 ReactiveMongoTemplate.findOne(Query query, Class<T> entityClass).

gateway() 运算符中没有您显示的签名。 使用 message.getHeaders().getId() 也是错误的,因为它不会反映您从 Kafka 收到的任何信息。

查看有关网关和增强器的更多文档:

https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-gateway

https://docs.spring.io/spring-integration/docs/current/reference/html/message-transformation.html#payload-enricher