从 KSQL Stream 使用 AVRO Kafka Topic 时出错

Error while consuming AVRO Kafka Topic from KSQL Stream

我在 KSQLDB 中创建了一些虚拟数据作为流 VALUE_FORMAT='JSON' TOPIC='MYTOPIC'

安装结束 Docker-撰写。我是 运行 Kafka Broker、Schema-registry、ksqldbcli、ksqldb-server、zookeeper

现在我想使用主题中的这些记录。 我的第一个也是最后一个方法是在命令行上使用以下命令

docker run  --net=host  --rm  confluentinc/cp-schema-registry:5.0.0  kafka-avro-console-consumer
--bootstrap-server localhost:29092 --topic DXT --from-beginning --max-messages 10
--property print.key=true --property print.value=true
--value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer
--key-deserializer org.apache.kafka.common.serialization.StringDeserializer

但这只是 returns 错误

[2021-04-22 21:45:42,926] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我也在 Java Spring 的不同用例中尝试过,但没有成功。我只是无法使用创建的主题。 如果我需要定义我自己的模式,我应该在哪里做,什么是最简单的方法,因为我刚刚在 Ksqldb 中创建了一个流? 是否有一个易于遵循的示例。当我像 Ksqldb.io 上的快速入门示例一样创建流时,我没有指定任何其他内容。 (我在部署中添加了架构注册表) 由于我是一个菜鸟,在这里坐了将近 10 个小时,我们将不胜感激。

编辑:我发现纯 JSON 不需要带有 ksqldb 的 Schema-registry。 Here。 但是如何反序列化呢?

如果您已向主题写入 JSON 数据,则可以使用 kafka-console-consumer 阅读它。

您遇到的错误 (Error deserializing Avro message for id -1…Unknown magic byte!) 是因为您使用的 kafka-avro-console-consumer 试图将主题数据反序列化为 Avro - 但事实并非如此,因此出现错误。

您还可以在 ksqlDB 中使用 PRINT DXT;