Kafka 流在 StreamTask 中解码时间戳元数据失败
Kafka streams fail on decoding timestamp metadata inside StreamTask
我们在启动应用程序期间在 Kafka Streams 上遇到了奇怪的错误
java.lang.IllegalArgumentException: Illegal base64 character 7b
at java.base/java.util.Base64$Decoder.decode0(Base64.java:743)
at java.base/java.util.Base64$Decoder.decode(Base64.java:535)
at java.base/java.util.Base64$Decoder.decode(Base64.java:558)
at org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:985)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:303)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:265)
at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:71)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:385)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
因此,关于流失败的错误:ERROR KafkaStreams - stream-client [xxx] All stream threads have died. The instance will be in error state and should be closed.
根据 org.apache.kafka.streams.processor.internals.StreamTask
中的代码,失败是由于解码时间戳元数据 (StreamTask.decodeTimestamp()
) 时出错。它发生在产品上,无法在舞台上重现。
此类错误的根本原因可能是什么?
额外信息:我们的应用程序使用 Kafka-Streams 并使用相同的 application.id
和 state.dir
使用来自多个 kafka 代理的消息(实际上我们从一个代理切换到另一个代理,但在一段时间内我们连接到两个经纪人,所以我们有两个卡夫卡流,每个经纪人一个)。据我了解,消费者群体生活在经纪人方面(所以应该不是问题),但状态目录在客户端。也许由于对两个 kafka 流使用相同的 state.dir
而发生了一些竞争条件?这可能是根本原因吗?
我们使用 kafka-streams
v.2.4.0
、kafka-clients
v.2.4.0
、Kafka Broker v.1.1.1
,配置如下:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
commit.interval.ms: 5000
num.stream.threads: 1
auto.offset.reset: latest
最后,我们找出了某些消费者群体元数据损坏的根本原因。
它是我们的内部监控工具之一(用 pykafka
编写),暂时不活跃的消费者群体破坏了元数据。
元数据未加密且包含无效数据,如下所示:{"consumer_id": "", "hostname": "monitoring-xxx"}
。
为了了解我们在消费者元数据中到底有什么,我们可以使用以下代码:
Map<String, Object> config = Map.of( "group.id", "...", "bootstrap.servers", "...");
String topicName = "...";
Consumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
Set<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topicName).stream()
.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
.collect(Collectors.toSet());
kafkaConsumer.committed(topicPartitions).forEach((key, value) ->
System.out.println("Partition: " + key + " metadata: " + (value != null ? value.metadata() : null)));
修复已损坏的元数据的几个选项:
- 将消费者组更改为新消费者组。请注意,根据
latest
或 earliest
偏移重置策略,您可能会丢失或重复消息。所以在某些情况下,这个选项可能是不可接受的
手动覆盖元数据(时间戳根据StreamTask.decodeTimestamp()
中的逻辑编码):
Map<TopicPartition, OffsetAndMetadata> updatedTopicPartitionToOffsetMetadataMap = kafkaConsumer.committed(topicPartitions).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, (entry) -> new OffsetAndMetadata((entry.getValue()).offset(), "AQAAAXGhcf01")));
kafkaConsumer.commitSync(updatedTopicPartitionToOffsetMetadataMap);
或将元数据指定为 Af//////////
,在 Kafka Streams 中表示 NO_TIMESTAMP
。
我们在启动应用程序期间在 Kafka Streams 上遇到了奇怪的错误
java.lang.IllegalArgumentException: Illegal base64 character 7b
at java.base/java.util.Base64$Decoder.decode0(Base64.java:743)
at java.base/java.util.Base64$Decoder.decode(Base64.java:535)
at java.base/java.util.Base64$Decoder.decode(Base64.java:558)
at org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:985)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:303)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:265)
at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:71)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:385)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
因此,关于流失败的错误:ERROR KafkaStreams - stream-client [xxx] All stream threads have died. The instance will be in error state and should be closed.
根据 org.apache.kafka.streams.processor.internals.StreamTask
中的代码,失败是由于解码时间戳元数据 (StreamTask.decodeTimestamp()
) 时出错。它发生在产品上,无法在舞台上重现。
此类错误的根本原因可能是什么?
额外信息:我们的应用程序使用 Kafka-Streams 并使用相同的 application.id
和 state.dir
使用来自多个 kafka 代理的消息(实际上我们从一个代理切换到另一个代理,但在一段时间内我们连接到两个经纪人,所以我们有两个卡夫卡流,每个经纪人一个)。据我了解,消费者群体生活在经纪人方面(所以应该不是问题),但状态目录在客户端。也许由于对两个 kafka 流使用相同的 state.dir
而发生了一些竞争条件?这可能是根本原因吗?
我们使用 kafka-streams
v.2.4.0
、kafka-clients
v.2.4.0
、Kafka Broker v.1.1.1
,配置如下:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
commit.interval.ms: 5000
num.stream.threads: 1
auto.offset.reset: latest
最后,我们找出了某些消费者群体元数据损坏的根本原因。
它是我们的内部监控工具之一(用 pykafka
编写),暂时不活跃的消费者群体破坏了元数据。
元数据未加密且包含无效数据,如下所示:{"consumer_id": "", "hostname": "monitoring-xxx"}
。
为了了解我们在消费者元数据中到底有什么,我们可以使用以下代码:
Map<String, Object> config = Map.of( "group.id", "...", "bootstrap.servers", "...");
String topicName = "...";
Consumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
Set<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topicName).stream()
.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
.collect(Collectors.toSet());
kafkaConsumer.committed(topicPartitions).forEach((key, value) ->
System.out.println("Partition: " + key + " metadata: " + (value != null ? value.metadata() : null)));
修复已损坏的元数据的几个选项:
- 将消费者组更改为新消费者组。请注意,根据
latest
或earliest
偏移重置策略,您可能会丢失或重复消息。所以在某些情况下,这个选项可能是不可接受的 手动覆盖元数据(时间戳根据
StreamTask.decodeTimestamp()
中的逻辑编码):Map<TopicPartition, OffsetAndMetadata> updatedTopicPartitionToOffsetMetadataMap = kafkaConsumer.committed(topicPartitions).entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> new OffsetAndMetadata((entry.getValue()).offset(), "AQAAAXGhcf01"))); kafkaConsumer.commitSync(updatedTopicPartitionToOffsetMetadataMap);
或将元数据指定为Af//////////
,在 Kafka Streams 中表示NO_TIMESTAMP
。