Spring 集成推荐的使用基于键的查询丰富事件的方法
Spring Integration recommended way of enriching events with key based queries
我想从 Kafka 中读取数据,并且在每个事件中,使用 Id 和 Kafka 事件中的另一个字段从 MongoDB 中读取。我想知道通常推荐的方法是什么,以及是否可以使用 ReactiveMongoDbMessageSource
来做到这一点。我想也许正确的运算符是 .gateway()
或 .enrich()
但我真的不确定。我真的不知道如何将它与消息源一起使用,所以我不确定它是否可能。我希望能够写出这样的东西:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(GenericMessage::new))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.gateway((message) -> enrichMongoDbPayloadByMessageKey(message.getHeaders().getId())
.handle(new ReactiveElasticsearchMessageHandler());
}
我真的很想看看我需要的模拟实现的示例 enrichMongoDbPayloadByMessageKey()
。
gateway()
或 enricher()
是正确的方向,具体取决于您的要求,如果您希望仅使用 MongoDb 请求的结果继续流程,或者您希望向 transform()
.
的结果添加更多数据
ReactiveMongoDbMessageSource
在这里是一个错误的方向,因为它被用作消息的来源 - 流的开始。在您的情况下,它实际上是一个基于从 Kafka 收到的结果的服务激活器。
没有(还)反应式 MongoDb 网关(request-reply 通道适配器),但最接近的 out-of-the-box 解决方案是 MongoDbOutboundGateway
:https://docs.spring.io/spring-integration/docs/current/reference/html/mongodb.html#mongodb-outbound-gateway .
如果您真的想在这里处理反应式解决方案,请考虑实现将接收您的参数的服务方法,对 MongoDB 和 return 执行反应式操作。看到那个目标 ReactiveMongoTemplate.findOne(Query query, Class<T> entityClass)
.
gateway()
运算符中没有您显示的签名。
使用 message.getHeaders().getId()
也是错误的,因为它不会反映您从 Kafka 收到的任何信息。
查看有关网关和增强器的更多文档:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-gateway
我想从 Kafka 中读取数据,并且在每个事件中,使用 Id 和 Kafka 事件中的另一个字段从 MongoDB 中读取。我想知道通常推荐的方法是什么,以及是否可以使用 ReactiveMongoDbMessageSource
来做到这一点。我想也许正确的运算符是 .gateway()
或 .enrich()
但我真的不确定。我真的不知道如何将它与消息源一起使用,所以我不确定它是否可能。我希望能够写出这样的东西:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(GenericMessage::new))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.gateway((message) -> enrichMongoDbPayloadByMessageKey(message.getHeaders().getId())
.handle(new ReactiveElasticsearchMessageHandler());
}
我真的很想看看我需要的模拟实现的示例 enrichMongoDbPayloadByMessageKey()
。
gateway()
或 enricher()
是正确的方向,具体取决于您的要求,如果您希望仅使用 MongoDb 请求的结果继续流程,或者您希望向 transform()
.
ReactiveMongoDbMessageSource
在这里是一个错误的方向,因为它被用作消息的来源 - 流的开始。在您的情况下,它实际上是一个基于从 Kafka 收到的结果的服务激活器。
没有(还)反应式 MongoDb 网关(request-reply 通道适配器),但最接近的 out-of-the-box 解决方案是 MongoDbOutboundGateway
:https://docs.spring.io/spring-integration/docs/current/reference/html/mongodb.html#mongodb-outbound-gateway .
如果您真的想在这里处理反应式解决方案,请考虑实现将接收您的参数的服务方法,对 MongoDB 和 return 执行反应式操作。看到那个目标 ReactiveMongoTemplate.findOne(Query query, Class<T> entityClass)
.
gateway()
运算符中没有您显示的签名。
使用 message.getHeaders().getId()
也是错误的,因为它不会反映您从 Kafka 收到的任何信息。
查看有关网关和增强器的更多文档:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-gateway