按键分组 Ktable 后无效(负)时间戳
Invalid (negative) timestamp after grouping Ktable by key
我正在使用 KakfkaStreams (2.3.0) 通过 Stream 和 Ktable 之间的连接(压缩主题)来丰富一些价值。
问题是压缩主题正在使用与流主题不同的分区程序写入,因此连接无法按预期工作(某些键不匹配,因为它们位于不同的分区中)。
经纪商版本为 0.10.2。
我开始研究通过使用 groupBy() 后跟 reduce() 来重新划分压缩的主题,但是当它开始读取创建的重新分区主题,它开始抛出 StreamsException 消息:
Input record ConsumerRecord(topic = mappings-table-repartition, partition = 18, leaderEpoch = null, offset = 0, CreateTime = -1, serialized key size = 37, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0cecdec3863208e57, value = (1126999878035640323<-null)) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
重新分区是由 KafkaStreams 自动创建和填充的,所以这看起来很奇怪,因为它会写入无效记录。
在阅读最初的紧凑主题时,我什至尝试提供自定义时间戳提取器,但它没有任何区别。堆栈跟踪似乎表明它正在使用内部 TimestampExtractor(并查看它确实是的代码)
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:73)
at org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61)
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:48)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:167)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
代码是这样的:
KTable<String, String> table = streamsBuilder
.table(mappingsTopic,
Consumed.with(Serdes.String(), Serdes.String(),
null,
Topology.AutoOffsetReset.EARLIEST))
.groupBy(KeyValue::pair,
Grouped.with("mappings-table", Serdes.String(), Serdes.String()))
.reduce((value1, value2) -> value2, (value1, value2) -> value2)
如错误消息所示,您需要升级代理使用的消息格式(参见代理配置 log.message.format.version
)
从 Kafka Streams 1.0 开始,需要消息格式 0.10 或更新版本。
发件人:https://kafka.apache.org/23/documentation/streams/upgrade-guide
Note, that a brokers must be on version 0.10.1 or higher to run a Kafka Streams application version 0.10.1 or higher; On-disk message format must be 0.10 or higher to run a Kafka Streams application version 1.0 or higher. For Kafka Streams 0.10.0, broker version 0.10.0 or higher is required.
Kafka Streams尝试在写入时设置记录时间戳,但是,旧的消息格式不支持时间戳(在写入时,消息格式降级时时间戳会丢失)。因此,当消息格式转换回来时,在读取时将虚拟时间戳 -1
放入消息中。
不允许为重新分区主题设置不同的时间戳提取器,因为上游时间戳必须转发到下游才能正确。
我正在使用 KakfkaStreams (2.3.0) 通过 Stream 和 Ktable 之间的连接(压缩主题)来丰富一些价值。 问题是压缩主题正在使用与流主题不同的分区程序写入,因此连接无法按预期工作(某些键不匹配,因为它们位于不同的分区中)。
经纪商版本为 0.10.2。
我开始研究通过使用 groupBy() 后跟 reduce() 来重新划分压缩的主题,但是当它开始读取创建的重新分区主题,它开始抛出 StreamsException 消息:
Input record ConsumerRecord(topic = mappings-table-repartition, partition = 18, leaderEpoch = null, offset = 0, CreateTime = -1, serialized key size = 37, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0cecdec3863208e57, value = (1126999878035640323<-null)) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
重新分区是由 KafkaStreams 自动创建和填充的,所以这看起来很奇怪,因为它会写入无效记录。 在阅读最初的紧凑主题时,我什至尝试提供自定义时间戳提取器,但它没有任何区别。堆栈跟踪似乎表明它正在使用内部 TimestampExtractor(并查看它确实是的代码)
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:73) at org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61) at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:48) at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:167) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
代码是这样的:
KTable<String, String> table = streamsBuilder
.table(mappingsTopic,
Consumed.with(Serdes.String(), Serdes.String(),
null,
Topology.AutoOffsetReset.EARLIEST))
.groupBy(KeyValue::pair,
Grouped.with("mappings-table", Serdes.String(), Serdes.String()))
.reduce((value1, value2) -> value2, (value1, value2) -> value2)
如错误消息所示,您需要升级代理使用的消息格式(参见代理配置 log.message.format.version
)
从 Kafka Streams 1.0 开始,需要消息格式 0.10 或更新版本。
发件人:https://kafka.apache.org/23/documentation/streams/upgrade-guide
Note, that a brokers must be on version 0.10.1 or higher to run a Kafka Streams application version 0.10.1 or higher; On-disk message format must be 0.10 or higher to run a Kafka Streams application version 1.0 or higher. For Kafka Streams 0.10.0, broker version 0.10.0 or higher is required.
Kafka Streams尝试在写入时设置记录时间戳,但是,旧的消息格式不支持时间戳(在写入时,消息格式降级时时间戳会丢失)。因此,当消息格式转换回来时,在读取时将虚拟时间戳 -1
放入消息中。
不允许为重新分区主题设置不同的时间戳提取器,因为上游时间戳必须转发到下游才能正确。