如何使用来自 Kafka 的 Python decode/deserialize Avro
How to decode/deserialize Avro with Python from Kafka
我在 Python(使用 Confluent Kafka Python 库的消费者)中从远程服务器 Kafka Avro 消息接收到消息,这些消息表示点击流数据 json 字典的字段如下用户代理、位置、url 等。消息如下所示:
b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:https://website.in/rooms/\x02Hhttps://website.in/wellness-spa/\x02\xaa\x14\x02\x9c\n\x02\xaa\x14\x02\xd0\x0b\x02V0:j3lcu1if:rTftGozmxSPo96dz1kGH2hvd0CREXmf2\x02V0:j3lj1xt7:YD4daqNRv_Vsea4wuFErpDaWeHu4tW7e\x02\x08null\x02\nnull0\x10pageview\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10Thailand\x02\xa6\x80\xc4\x01\x02\x0eBangkok\x02\x8c\xba\xc4\x01\x020*\xa9\x13\xd0\x84+@\x02\xec\xc09#J\x1fY@\x02\x8a\x02Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36\x02\x10Chromium\x02\x10Chromium\x028Google Inc. and contributors\x02\x0eBrowser\x02\x1858.0.3029.96\x02"Personal computer\x02\nLinux\x02\x00\x02\x1cCanonical Ltd.'
如何解码?我尝试了 bson 解码,但该字符串未被识别为 UTF-8,因为我猜它是一种特定的 Avro 编码。我找到 https://github.com/verisign/python-confluent-schemaregistry 但它只支持 Python 2.7。理想情况下,我想使用 Python 3.5+ 和 MongoDB 来处理数据并将其存储为我当前的基础架构。
我以为Avro库只是读取Avro文件,但它实际上解决了解码Kafka消息的问题,如下:我首先导入库并将模式文件作为参数,然后创建一个函数来解码将消息放入字典中,我可以在消费者循环中使用它。
import io
from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
c = Consumer()
c.subscribe(topic)
running = True
while running:
msg = c.poll()
if not msg.error():
msg_value = msg.value()
event_dict = decode(msg_value)
print(event_dict)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
如果您使用 Confluent Schema Registry 并想要反序列化 avro 消息,只需将 message_bytes.seek(5) 添加到解码函数中,因为 Confluent 在典型的 avro 格式数据之前添加了 5 个额外字节。
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
message_bytes.seek(5)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
如果您可以访问 Confluent 架构注册服务器,您也可以使用 Confluent 自己的 AvroDeserializer
以避免弄乱他们神奇的 5 个字节:
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
def process_record_confluent(record: bytes, src: SchemaRegistryClient, schema: str):
deserializer = AvroDeserializer(schema_str=schema, schema_registry_client=src)
return deserializer(record, None) # returns dict
我在 Python(使用 Confluent Kafka Python 库的消费者)中从远程服务器 Kafka Avro 消息接收到消息,这些消息表示点击流数据 json 字典的字段如下用户代理、位置、url 等。消息如下所示:
b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:https://website.in/rooms/\x02Hhttps://website.in/wellness-spa/\x02\xaa\x14\x02\x9c\n\x02\xaa\x14\x02\xd0\x0b\x02V0:j3lcu1if:rTftGozmxSPo96dz1kGH2hvd0CREXmf2\x02V0:j3lj1xt7:YD4daqNRv_Vsea4wuFErpDaWeHu4tW7e\x02\x08null\x02\nnull0\x10pageview\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10Thailand\x02\xa6\x80\xc4\x01\x02\x0eBangkok\x02\x8c\xba\xc4\x01\x020*\xa9\x13\xd0\x84+@\x02\xec\xc09#J\x1fY@\x02\x8a\x02Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36\x02\x10Chromium\x02\x10Chromium\x028Google Inc. and contributors\x02\x0eBrowser\x02\x1858.0.3029.96\x02"Personal computer\x02\nLinux\x02\x00\x02\x1cCanonical Ltd.'
如何解码?我尝试了 bson 解码,但该字符串未被识别为 UTF-8,因为我猜它是一种特定的 Avro 编码。我找到 https://github.com/verisign/python-confluent-schemaregistry 但它只支持 Python 2.7。理想情况下,我想使用 Python 3.5+ 和 MongoDB 来处理数据并将其存储为我当前的基础架构。
我以为Avro库只是读取Avro文件,但它实际上解决了解码Kafka消息的问题,如下:我首先导入库并将模式文件作为参数,然后创建一个函数来解码将消息放入字典中,我可以在消费者循环中使用它。
import io
from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
c = Consumer()
c.subscribe(topic)
running = True
while running:
msg = c.poll()
if not msg.error():
msg_value = msg.value()
event_dict = decode(msg_value)
print(event_dict)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
如果您使用 Confluent Schema Registry 并想要反序列化 avro 消息,只需将 message_bytes.seek(5) 添加到解码函数中,因为 Confluent 在典型的 avro 格式数据之前添加了 5 个额外字节。
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
message_bytes.seek(5)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
如果您可以访问 Confluent 架构注册服务器,您也可以使用 Confluent 自己的 AvroDeserializer
以避免弄乱他们神奇的 5 个字节:
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
def process_record_confluent(record: bytes, src: SchemaRegistryClient, schema: str):
deserializer = AvroDeserializer(schema_str=schema, schema_registry_client=src)
return deserializer(record, None) # returns dict