正确地从 Kafka ktable 中删除记录
correctly delete record from Kafka ktable
我想从 KTable 中正确删除对象。
我有以下流处理数据:
input
.map(this::deserialize)
.filter(((key, event) ->
Arrays.asList(
"AssessmentInitialized",
"AssessmentRenamed",
"AssessmentDeleted"
).contains(event.eventType())))
.map( ... do mapping ... )
.groupBy((key, event) -> key, Serialized.with(Serdes.String(), domainEventSerde))
.aggregate(
Assessment::new,
(key, event, assessment) -> assessment.handleEvent(event),
Materialized.<String, Assessment, KeyValueStore<Bytes, byte[]>>as(ASSESSMENTS_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(assessmentSerde)
);
和 assessment.handleEvent(event)
returns null
当它处理 AssessmentDeleted
事件时。 Serde 是 this.assessmentSerde = new JsonSerde<>(Assessment.class, objectMapper);
,其中 mapper 是默认的 com.fasterxml.jackson.databind.ObjectMapper bean。
此流处理事件后,我在 Kafka KTable 更改日志中看到以下值:
{
"topic": "events-consumer-v1-assessments-store-changelog",
"key": "5d6d7a70-c907-460f-ac88-69705f079939",
"value": "ée",
"partition": 0,
"offset": 2
}
它看起来不像是我想要的东西。这种删除对象的方式是否正确?我预计如果我将 null 作为值推送到聚合操作,它将被删除,但看起来像留下了一些垃圾,我不确定它的映射器问题或不正确的删除方式或正确的 KTable 行为。
在你的情况下,问题似乎出在检查工具中。由于某种原因,它没有正确反序列化 null
值。
总是最好先使用 Kafka 工具检查它 (kafka-console-consumer.sh
, kafka-console-producer.sh
).
我想从 KTable 中正确删除对象。
我有以下流处理数据:
input
.map(this::deserialize)
.filter(((key, event) ->
Arrays.asList(
"AssessmentInitialized",
"AssessmentRenamed",
"AssessmentDeleted"
).contains(event.eventType())))
.map( ... do mapping ... )
.groupBy((key, event) -> key, Serialized.with(Serdes.String(), domainEventSerde))
.aggregate(
Assessment::new,
(key, event, assessment) -> assessment.handleEvent(event),
Materialized.<String, Assessment, KeyValueStore<Bytes, byte[]>>as(ASSESSMENTS_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(assessmentSerde)
);
和 assessment.handleEvent(event)
returns null
当它处理 AssessmentDeleted
事件时。 Serde 是 this.assessmentSerde = new JsonSerde<>(Assessment.class, objectMapper);
,其中 mapper 是默认的 com.fasterxml.jackson.databind.ObjectMapper bean。
此流处理事件后,我在 Kafka KTable 更改日志中看到以下值:
{
"topic": "events-consumer-v1-assessments-store-changelog",
"key": "5d6d7a70-c907-460f-ac88-69705f079939",
"value": "ée",
"partition": 0,
"offset": 2
}
它看起来不像是我想要的东西。这种删除对象的方式是否正确?我预计如果我将 null 作为值推送到聚合操作,它将被删除,但看起来像留下了一些垃圾,我不确定它的映射器问题或不正确的删除方式或正确的 KTable 行为。
在你的情况下,问题似乎出在检查工具中。由于某种原因,它没有正确反序列化 null
值。
总是最好先使用 Kafka 工具检查它 (kafka-console-consumer.sh
, kafka-console-producer.sh
).