kafka-avro-console-consumer 的未知魔法字节

Unknown magic byte with kafka-avro-console-consumer

我一直在尝试将 kafka-avro-console-consumer 从 Confluent 连接到我们遗留的 Kafka 集群,该集群是在没有 Confluent Schema Registry 的情况下部署的。 我使用如下属性显式提供了架构:

kafka-console-consumer --bootstrap-server kafka02.internal:9092 \
    --topic test \
    --from-beginning \
    --property key.schema='{"type":"long"}' \
    --property value.schema='{"type":"long"}'

但我收到 'Unknown magic byte!' 错误 org.apache.kafka.common.errors.SerializationException

是否可以使用 Confluent kafka-avro-console-consumer 使用来自 Kafka 的 Avro 消息,这些消息未使用来自 Confluent 的 AvroSerializer 和 Schema Registry 进行序列化?

Confluent Schema Registry serialiser/deserializer 使用 wire format,它在消息的初始字节中包含有关模式 ID 等的信息。

如果您的消息尚未使用 Schema Registry 序列化程序进行序列化,那么您将无法使用它反序列化它,并且会收到 Unknown magic byte! 错误。

所以你需要编写一个消费者来提取消息,使用你的 Avro avsc 模式进行反序列化,然后假设你想保留数据,使用 Schema Registry serializer

编辑: 我最近写了一篇文章,更深入地解释了整个事情:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

kafka-console-consumer 不知道 key.schemavalue.schema,只有 Avro producer 知道。 Source code here

普通的控制台用户不关心数据的格式——它只会打印 UTF8 编码的字节

kafka-<b>avro</b>-console-consumer接受的属性只有schema.registry.url。所以,要回答这个问题,是的,它需要使用 Confluent 序列化程序进行序列化。