将模式添加到具有特定 ID 的模式注册表

Add schema to Schema Registry with a specific Id

我们已经将 Confluent Schema Registry 与 KafkaStreams 结合使用一年多了,一切都运行良好;直到昨天。

在 UAT 环境中,我们似乎删除了架构主题,并且我们的一个应用程序开始故障转移并显示消息

[ERROR] LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_13, topic: TOPIC_NAME, partition: 13, offset: 0 org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1531

我检查了 Schema Registry 并注意到主题丢失,并使用 curl 查询错误中列出的 ID 1531,例如:

curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531

然后回来了:

{"error_code":40403,"message":"Schema not found"}

我天真地只是想再次注册模式,没有考虑它并且它有效但是模式注册的 ID 与之前的 1531 ID 不同。

我需要注册到 ID 1531 的架构,因为主题中的现有消息已经在魔术字节中包含该 ID 1531。

我查看了 https://docs.confluent.io/current/schema-registry/docs/develop/api.html 的 API 文档,但没有看到任何关于为架构设置给定 ID 的内容。

有没有办法通过模式注册表将模式强制为特定的 Id?

我知道一些备份解决方案,但我现在正在寻找一个修复程序,希望能防止数据丢失或采取特殊措施来修复主题数据。

Is there anyway to force a schema to a specific Id with schema registry?

没有。


1531 的 ID 实际上不是 "gone",顺便说一下,它只是在注册表中 标记 已删除(使用 _schemas主题来查看)。


我知道您在使用 KafkaAvroDeserializer 时遇到的错误确实没有办法解决。您必须使用 ByteArrayDeserializer,然后 "fix" 或 "lookup" 使用 Schema Registry 客户端的正确 ID,然后反序列化消息的其余部分。

另一种选择是重置您的使用者组,以便您完全跳过这些消息,或者设置异常处理。