如何抓取不符合kafka stream join条件的kafka记录?
How to capture kafka records that don't match the condition of the kafka stream join?
我正在通过加入 kstream 和 ktable 来丰富数据。 kstream 包含车辆发送的消息,ktable 包含车辆数据。
我遇到的问题是我想从 table 中没有相应连接键的流中捕获消息。
Kafka 流静默地跳过他们没有连接匹配的记录。
有什么方法可以将这些记录发送到不同的主题,以便稍后处理它们吗?
StreamsBuilder builder = new StreamsBuilder();
final KTable<String, VinMappingInfo> vinMappingTable = builder.table(vinInfoTopic, Consumed.with(Serdes.String(), valueSerde));
KStream<String, VehicleMessage> vehicleStream = builder.stream(sourceTopic);
vehicleStream.join(vinMappingTable, (vehicleMsg, vinInfo) -> {
log.info("joining {} with vin info {}", vehicleMsg.getPayload().getId(), vinInfo.data.vin);
vehicleMsg.setVin(vinInfo.data.vin);
return vehicleMsg;
}, Joined.with(null, null, valueSerde))
.to(destinationTopic);
final Topology topology = builder.build();
log.info("The topology of connected processor nodes: \n {}", topology.describe());
KafkaStreams streams = new KafkaStreams(topology, config);
streams.cleanUp();
streams.start();
您可以使用 left-join:
stream.leftJoin(table,...);
这确保了输入流中的所有记录都在输出流中。在这种情况下,ValueJoiner
将与 apply(streamValue, null)
一起调用。
我正在通过加入 kstream 和 ktable 来丰富数据。 kstream 包含车辆发送的消息,ktable 包含车辆数据。 我遇到的问题是我想从 table 中没有相应连接键的流中捕获消息。 Kafka 流静默地跳过他们没有连接匹配的记录。 有什么方法可以将这些记录发送到不同的主题,以便稍后处理它们吗?
StreamsBuilder builder = new StreamsBuilder();
final KTable<String, VinMappingInfo> vinMappingTable = builder.table(vinInfoTopic, Consumed.with(Serdes.String(), valueSerde));
KStream<String, VehicleMessage> vehicleStream = builder.stream(sourceTopic);
vehicleStream.join(vinMappingTable, (vehicleMsg, vinInfo) -> {
log.info("joining {} with vin info {}", vehicleMsg.getPayload().getId(), vinInfo.data.vin);
vehicleMsg.setVin(vinInfo.data.vin);
return vehicleMsg;
}, Joined.with(null, null, valueSerde))
.to(destinationTopic);
final Topology topology = builder.build();
log.info("The topology of connected processor nodes: \n {}", topology.describe());
KafkaStreams streams = new KafkaStreams(topology, config);
streams.cleanUp();
streams.start();
您可以使用 left-join:
stream.leftJoin(table,...);
这确保了输入流中的所有记录都在输出流中。在这种情况下,ValueJoiner
将与 apply(streamValue, null)
一起调用。