Confluent-Kafka:Python 消费者中 Avro 序列化与模式处理的混淆
Confluent-Kafka: Avro Serialization Confusions with Schema Handling in Python Consumers
我正在尝试了解 Confluent Kafka 上的 Avro 序列化以及 Schema Registry 的使用。到最后一切都很顺利,但 AVRO 的最终期望让我很困惑。根据我的阅读和理解,Avro 序列化为我们提供了灵活性,当我们更改架构时,我们可以简单地管理它而不会影响旧的 producer/consumer.
同样,我开发了一个 python 生产者,它将检查架构注册表中是否存在架构,如果不存在,则创建它并开始生成下面显示的 json 消息。当我需要更改架构时,我只需在我的生产者中更新它,这就会生成具有新架构的消息。
我的旧架构:
data = '{"schema":"{\"type\":\"record\",\"name\":\"value\",\"namespace\":\"my.test\",\"fields\":[{\"name\":\"fname\",\"type\":\"string\"},{\"name\":\"lname\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"principal\",\"type\":\"string\"},{\"name\":\"ipaddress\",\"type\":\"string\"},{\"name\":\"mobile\",\"type\":\"long\"},{\"name\":\"passport_make_date\",\"type\":[\"string\",\"null\"],\"logicalType\":\"timestamp\",\"default\":\"None\"},{\"name\":\"passport_expiry_date\",\"type\":\"string\",\"logicalType\":\"date\"}]}"}'
来自 Producer-1 的示例数据:
{u'mobile': 9819841242, u'lname': u'Rogers', u'passport_expiry_date': u'2026-05-21', u'passport_make_date': u'2016-05-21', u'fname': u'tom', u'ipaddress': u'208.103.236.60', u'email': u'tom_Rogers@TEST.co.nz', u'principal': u'tom@EXAMPLE.COM'}
我的新架构:
data = '{"schema":"{\"type\":\"record\",\"name\":\"value\",\"namespace\":\"my.test\",\"fields\":[{\"name\":\"fname\",\"type\":\"string\"},{\"name\":\"lname\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"principal\",\"type\":\"string\"},{\"name\":\"ipaddress\",\"type\":\"string\"},{\"name\":\"mobile\",\"type\":\"long\"},{\"name\":\"new_passport_make_date\",\"type\":[\"string\",\"null\"],\"logicalType\":\"timestamp\",\"default\":\"None\"},{\"name\":\"new_passport_expiry_date\",\"type\":\"string\",\"logicalType\":\"date\"}]}"}'
来自 Producer-2 的示例数据:
{u'mobile': 9800647004, u'new_passport_make_date': u'2011-05-22', u'lname': u'Reed', u'fname': u'Paul', u'new_passport_expiry_date': u'2021-05-22', u'ipaddress': u'134.124.7.28', u'email': u'Paul_Reed@nbc.com', u'principal': u'Paul@EXAMPLE.COM'}
案例 1:当我有 2 个具有以上 2 个模式的生产者 运行 在一起时,我可以使用以下代码成功使用消息。到这里为止一切都很好。
while True:
try:
msg = c.poll(10)
except SerializerError as e:
xxxxx
break
print msg.value()
案例 2:当我在 JSON 领域稍微深入时,事情变得混乱和破裂。
首先,假设我有一个生产者 运行 上面的“我的旧模式”和一个成功使用这些消息的消费者。
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]
当我 运行 第二个生产者使用上面提到的“我的新模式”时,我的老消费者因为没有字段 passport_expiry_date 和 passport_make_date 而中断,这是真的。
问题:
有时我认为,这是预料之中的,因为是我(开发人员)使用了不在消息中的字段名称。但是 Avro 如何在这里提供帮助?缺失的字段不应该由 Avro 处理吗?我在 JAVA 中看到了正确处理这种情况的示例,但在 Python 中没有找到任何示例。例如,下面 github 有处理这种情况的完美示例。当该字段不存在时,消费者只需打印 'None'.
https://github.com/LearningJournal/ApacheKafkaTutorials
案例 3:当我 运行 旧生产者与旧消费者的组合然后在另一个终端新生产者与新消费者时,Producers/Consumers 混淆并且事情破裂说不 json 字段。
老消费者==>
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]
新消费者 ==>
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["new_passport_make_date"], msg.value()["new_passport_expiry_date"]
问题:
我再次认为,这是意料之中的。但是,Avro 让我认为正确的消费者应该通过正确的模式获得正确的消息。如果我使用 msg.value() 并且总是使用没有任何 Avro 角色的编程来解析消费者端的字段,那么使用 avro 的好处在哪里?在 SR 中使用 messages/storing 发送模式有什么好处?
最后,有什么方法可以检查附加到消息的模式吗?据我所知,在 Avro 中,模式 ID 附加在消息中,在读取和写入消息时进一步与模式注册表一起使用。但是我从来没有在消息中看到它。
在此先感谢。
不清楚您在注册表中使用的是什么兼容性设置,但我会向后假设,这意味着您需要添加一个具有默认值的字段。
听起来您得到的是 Python KeyError
,因为这些键不存在。
代替msg.value()["non-existing-key"]
,您可以尝试
选项 1:将其视为 dict()
msg.value().get("non-existing-key", "Default value")
选项 2:单独检查所有可能不存在的键
some_var = None # What you want to parse
val = msg.value()
if "non-existing-key" not in val:
some_var = "Default Value"
否则,您必须 "project" 旧数据上的新模式,这就是 Java 代码通过使用 SpecificRecord
子类所做的事情。这样,较旧的数据将使用较新的模式进行解析,该模式具有较新的字段及其默认值。
如果您在 Java 中使用 GenericRecord
,您会遇到类似的问题。我不确定 Python 中是否存在与 Java 中的 SpecificRecord
等效的内容。
顺便说一下,我认为字符串 "None"
不能申请 logicalType=timestamp
我正在尝试了解 Confluent Kafka 上的 Avro 序列化以及 Schema Registry 的使用。到最后一切都很顺利,但 AVRO 的最终期望让我很困惑。根据我的阅读和理解,Avro 序列化为我们提供了灵活性,当我们更改架构时,我们可以简单地管理它而不会影响旧的 producer/consumer.
同样,我开发了一个 python 生产者,它将检查架构注册表中是否存在架构,如果不存在,则创建它并开始生成下面显示的 json 消息。当我需要更改架构时,我只需在我的生产者中更新它,这就会生成具有新架构的消息。
我的旧架构:
data = '{"schema":"{\"type\":\"record\",\"name\":\"value\",\"namespace\":\"my.test\",\"fields\":[{\"name\":\"fname\",\"type\":\"string\"},{\"name\":\"lname\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"principal\",\"type\":\"string\"},{\"name\":\"ipaddress\",\"type\":\"string\"},{\"name\":\"mobile\",\"type\":\"long\"},{\"name\":\"passport_make_date\",\"type\":[\"string\",\"null\"],\"logicalType\":\"timestamp\",\"default\":\"None\"},{\"name\":\"passport_expiry_date\",\"type\":\"string\",\"logicalType\":\"date\"}]}"}'
来自 Producer-1 的示例数据:
{u'mobile': 9819841242, u'lname': u'Rogers', u'passport_expiry_date': u'2026-05-21', u'passport_make_date': u'2016-05-21', u'fname': u'tom', u'ipaddress': u'208.103.236.60', u'email': u'tom_Rogers@TEST.co.nz', u'principal': u'tom@EXAMPLE.COM'}
我的新架构:
data = '{"schema":"{\"type\":\"record\",\"name\":\"value\",\"namespace\":\"my.test\",\"fields\":[{\"name\":\"fname\",\"type\":\"string\"},{\"name\":\"lname\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"principal\",\"type\":\"string\"},{\"name\":\"ipaddress\",\"type\":\"string\"},{\"name\":\"mobile\",\"type\":\"long\"},{\"name\":\"new_passport_make_date\",\"type\":[\"string\",\"null\"],\"logicalType\":\"timestamp\",\"default\":\"None\"},{\"name\":\"new_passport_expiry_date\",\"type\":\"string\",\"logicalType\":\"date\"}]}"}'
来自 Producer-2 的示例数据:
{u'mobile': 9800647004, u'new_passport_make_date': u'2011-05-22', u'lname': u'Reed', u'fname': u'Paul', u'new_passport_expiry_date': u'2021-05-22', u'ipaddress': u'134.124.7.28', u'email': u'Paul_Reed@nbc.com', u'principal': u'Paul@EXAMPLE.COM'}
案例 1:当我有 2 个具有以上 2 个模式的生产者 运行 在一起时,我可以使用以下代码成功使用消息。到这里为止一切都很好。
while True:
try:
msg = c.poll(10)
except SerializerError as e:
xxxxx
break
print msg.value()
案例 2:当我在 JSON 领域稍微深入时,事情变得混乱和破裂。
首先,假设我有一个生产者 运行 上面的“我的旧模式”和一个成功使用这些消息的消费者。
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]
当我 运行 第二个生产者使用上面提到的“我的新模式”时,我的老消费者因为没有字段 passport_expiry_date 和 passport_make_date 而中断,这是真的。
问题:
有时我认为,这是预料之中的,因为是我(开发人员)使用了不在消息中的字段名称。但是 Avro 如何在这里提供帮助?缺失的字段不应该由 Avro 处理吗?我在 JAVA 中看到了正确处理这种情况的示例,但在 Python 中没有找到任何示例。例如,下面 github 有处理这种情况的完美示例。当该字段不存在时,消费者只需打印 'None'.
https://github.com/LearningJournal/ApacheKafkaTutorials
案例 3:当我 运行 旧生产者与旧消费者的组合然后在另一个终端新生产者与新消费者时,Producers/Consumers 混淆并且事情破裂说不 json 字段。
老消费者==>
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]
新消费者 ==>
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["new_passport_make_date"], msg.value()["new_passport_expiry_date"]
问题:
我再次认为,这是意料之中的。但是,Avro 让我认为正确的消费者应该通过正确的模式获得正确的消息。如果我使用 msg.value() 并且总是使用没有任何 Avro 角色的编程来解析消费者端的字段,那么使用 avro 的好处在哪里?在 SR 中使用 messages/storing 发送模式有什么好处?
最后,有什么方法可以检查附加到消息的模式吗?据我所知,在 Avro 中,模式 ID 附加在消息中,在读取和写入消息时进一步与模式注册表一起使用。但是我从来没有在消息中看到它。
在此先感谢。
不清楚您在注册表中使用的是什么兼容性设置,但我会向后假设,这意味着您需要添加一个具有默认值的字段。
听起来您得到的是 Python KeyError
,因为这些键不存在。
代替msg.value()["non-existing-key"]
,您可以尝试
选项 1:将其视为 dict()
msg.value().get("non-existing-key", "Default value")
选项 2:单独检查所有可能不存在的键
some_var = None # What you want to parse
val = msg.value()
if "non-existing-key" not in val:
some_var = "Default Value"
否则,您必须 "project" 旧数据上的新模式,这就是 Java 代码通过使用 SpecificRecord
子类所做的事情。这样,较旧的数据将使用较新的模式进行解析,该模式具有较新的字段及其默认值。
如果您在 Java 中使用 GenericRecord
,您会遇到类似的问题。我不确定 Python 中是否存在与 Java 中的 SpecificRecord
等效的内容。
顺便说一下,我认为字符串 "None"
不能申请 logicalType=timestamp