删除下游 ChangeLog 对象 KafkaStreams
Delete Downstream ChangeLog Objects KafkaStreams
我正在尝试删除值为 null 的记录,在下游变更日志中,我知道在 statestore 中它们只是因为为 null(墓碑)而被删除,但是当您对 KTable 或 Stream 进行聚合时,他们跳过 null 并且不删除它。我需要想办法在聚合中设置删除标志,让 Kafka 知道可以删除记录。这是我的代码:
public void deleteByEntity(String inputTopic, String target, String stateStoreName) {
// Need to set property to true in application.properties
// if ("true".equals(utils.getProperty(ApplicationConfigs.KAFKA_DELETE_BY_ENTITY))) {
Materialized<String, String, KeyValueStore<Bytes, byte[]>> storeName =
Materialized.as(stateStoreName);
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> docStream = streamsBuilder.stream(inputTopic);
KTable<?, ?> dataInTable =
docStream
.groupByKey()
.reduce(
(value1, value2) -> {
// System.out.println("aa");
if (value1.equals(target)) {
// If key equals target topic return null, creates tombstone deletes from
// statestore, sends null record downstream
return null;
}
return value2;
},
storeName);
// System.out.println(dataInTable);
}
谢谢
如果您 return null
从您的 Reducer
它会从存储中删除数据 和 它会发送相应的输出记录 <key,null>
.因此,不需要下游处理。
请注意,null
键和 null
值仅在 输入 记录到 reduce()
时被忽略。
我正在尝试删除值为 null 的记录,在下游变更日志中,我知道在 statestore 中它们只是因为为 null(墓碑)而被删除,但是当您对 KTable 或 Stream 进行聚合时,他们跳过 null 并且不删除它。我需要想办法在聚合中设置删除标志,让 Kafka 知道可以删除记录。这是我的代码:
public void deleteByEntity(String inputTopic, String target, String stateStoreName) {
// Need to set property to true in application.properties
// if ("true".equals(utils.getProperty(ApplicationConfigs.KAFKA_DELETE_BY_ENTITY))) {
Materialized<String, String, KeyValueStore<Bytes, byte[]>> storeName =
Materialized.as(stateStoreName);
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> docStream = streamsBuilder.stream(inputTopic);
KTable<?, ?> dataInTable =
docStream
.groupByKey()
.reduce(
(value1, value2) -> {
// System.out.println("aa");
if (value1.equals(target)) {
// If key equals target topic return null, creates tombstone deletes from
// statestore, sends null record downstream
return null;
}
return value2;
},
storeName);
// System.out.println(dataInTable);
}
谢谢
如果您 return null
从您的 Reducer
它会从存储中删除数据 和 它会发送相应的输出记录 <key,null>
.因此,不需要下游处理。
请注意,null
键和 null
值仅在 输入 记录到 reduce()
时被忽略。