python kafka 库的编码/格式问题
Encoding / formatting issues with python kafka library
我一直在尝试使用 python kafka 库,但无法让制作人工作。
经过一些研究,我发现 kafka 发送(我猜也是期望的)额外的 5 字节 header(一个 0 字节,一个长包含 schema-registry) 给消费者。我已经设法通过简单地剥离第一个字节来让消费者工作。
我是否应该在编写制作人时添加类似的 header?
下面出现的异常:
[2016-09-14 13:32:48,684] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我正在使用 kafka 和 python-kafka 的最新稳定版本。
编辑
消费者
from kafka import KafkaConsumer
import avro.io
import avro.schema
import io
import requests
import struct
# To consume messages
consumer = KafkaConsumer('hadoop_00',
group_id='my_group',
bootstrap_servers=['hadoop-master:9092'])
schema_path = "resources/f1.avsc"
for msg in consumer:
value = bytearray(msg.value)
schema_id = struct.unpack(">L", value[1:5])[0]
response = requests.get("http://hadoop-master:8081/schemas/ids/" + str(schema_id))
schema = response.json()["schema"]
schema = avro.schema.parse(schema)
bytes_reader = io.BytesIO(value[5:])
# bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
temp = reader.read(decoder)
print(temp)
制作人
from kafka import KafkaProducer
import avro.schema
import io
from avro.io import DatumWriter
producer = KafkaProducer(bootstrap_servers="hadoop-master")
# Kafka topic
topic = "hadoop_00"
# Path to user.avsc avro schema
schema_path = "resources/f1.avsc"
schema = avro.schema.parse(open(schema_path).read())
range = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
for i in range:
producer.send(topic, b'{"f1":"value_' + str(i))
由于您正在使用 BinaryDecoder 和 DatumReader 进行读取,如果您反向发送数据(使用 DatumWriter 和 BinaryEncoder 作为编码器),我想您的消息会很好。
像这样:
制作人
from kafka import KafkaProducer
import avro.schema
import io
from avro.io import DatumWriter, BinaryEncoder
producer = KafkaProducer(bootstrap_servers="hadoop-master")
# Kafka topic
topic = "hadoop_00"
# Path to user.avsc avro schema
schema_path = "resources/f1.avsc"
schema = avro.schema.parse(open(schema_path).read())
# range is a bad variable name. I changed it here
value_range = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
for i in value_range:
datum_writer = DatumWriter(schema)
byte_writer = io.BytesIO()
datum_encoder = BinaryEncoder(byte_writer)
datum_writer.write({"f1" : "value_%d" % (i)}, datum_encoder)
producer.send(topic, byte_writer.getvalue())
我做的几处改动是:
- 使用 DatumWriter 和 BinaryEncoder
- 我在字节流中发送字典而不是 json(您可能必须使用普通字典检查您的代码,它也可能有效;但我不确定)
- 使用字节流将消息发送到 kafka 主题(对我来说,有时它会失败,在那些情况下,我将 .getvalue 方法分配给一个变量并使用 producer.send 中的变量。我不'知道失败的原因但分配给变量总是有效)
我无法测试我添加的代码。但这是我之前使用 avro 时编写的一段代码。如果它不适合你,请在评论中告诉我。可能是因为我生锈的记忆。到家后,我会用一个有效的答案更新这个答案,在那里我可以测试代码。
我可以让我的 python 生产者使用 Schema-Registry 向 Kafka-Connect 发送消息:
...
import avro.datafile
import avro.io
import avro.schema
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='kafka:9092')
with open('schema.avsc') as f:
schema = avro.schema.Parse(f.read())
def post_message():
bytes_writer = io.BytesIO()
# Write the Confluent "Magic Byte"
bytes_writer.write(bytes([0]))
# Should get or create the schema version with Schema-Registry
...
schema_version = 1
bytes_writer.write(
int.to_bytes(schema_version, 4, byteorder='big'))
# and then the standard Avro bytes serialization
writer = avro.io.DatumWriter(schema)
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write({'key': 'value'}, encoder)
producer.send('topic', value=bytes_writer.getvalue())
关于 "Magic Byte" 的文档:
https://github.com/confluentinc/schema-registry/blob/master/docs/serializer-formatter.rst
我一直在尝试使用 python kafka 库,但无法让制作人工作。
经过一些研究,我发现 kafka 发送(我猜也是期望的)额外的 5 字节 header(一个 0 字节,一个长包含 schema-registry) 给消费者。我已经设法通过简单地剥离第一个字节来让消费者工作。
我是否应该在编写制作人时添加类似的 header?
下面出现的异常:
[2016-09-14 13:32:48,684] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我正在使用 kafka 和 python-kafka 的最新稳定版本。
编辑
消费者
from kafka import KafkaConsumer
import avro.io
import avro.schema
import io
import requests
import struct
# To consume messages
consumer = KafkaConsumer('hadoop_00',
group_id='my_group',
bootstrap_servers=['hadoop-master:9092'])
schema_path = "resources/f1.avsc"
for msg in consumer:
value = bytearray(msg.value)
schema_id = struct.unpack(">L", value[1:5])[0]
response = requests.get("http://hadoop-master:8081/schemas/ids/" + str(schema_id))
schema = response.json()["schema"]
schema = avro.schema.parse(schema)
bytes_reader = io.BytesIO(value[5:])
# bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
temp = reader.read(decoder)
print(temp)
制作人
from kafka import KafkaProducer
import avro.schema
import io
from avro.io import DatumWriter
producer = KafkaProducer(bootstrap_servers="hadoop-master")
# Kafka topic
topic = "hadoop_00"
# Path to user.avsc avro schema
schema_path = "resources/f1.avsc"
schema = avro.schema.parse(open(schema_path).read())
range = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
for i in range:
producer.send(topic, b'{"f1":"value_' + str(i))
由于您正在使用 BinaryDecoder 和 DatumReader 进行读取,如果您反向发送数据(使用 DatumWriter 和 BinaryEncoder 作为编码器),我想您的消息会很好。
像这样:
制作人
from kafka import KafkaProducer
import avro.schema
import io
from avro.io import DatumWriter, BinaryEncoder
producer = KafkaProducer(bootstrap_servers="hadoop-master")
# Kafka topic
topic = "hadoop_00"
# Path to user.avsc avro schema
schema_path = "resources/f1.avsc"
schema = avro.schema.parse(open(schema_path).read())
# range is a bad variable name. I changed it here
value_range = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
for i in value_range:
datum_writer = DatumWriter(schema)
byte_writer = io.BytesIO()
datum_encoder = BinaryEncoder(byte_writer)
datum_writer.write({"f1" : "value_%d" % (i)}, datum_encoder)
producer.send(topic, byte_writer.getvalue())
我做的几处改动是:
- 使用 DatumWriter 和 BinaryEncoder
- 我在字节流中发送字典而不是 json(您可能必须使用普通字典检查您的代码,它也可能有效;但我不确定)
- 使用字节流将消息发送到 kafka 主题(对我来说,有时它会失败,在那些情况下,我将 .getvalue 方法分配给一个变量并使用 producer.send 中的变量。我不'知道失败的原因但分配给变量总是有效)
我无法测试我添加的代码。但这是我之前使用 avro 时编写的一段代码。如果它不适合你,请在评论中告诉我。可能是因为我生锈的记忆。到家后,我会用一个有效的答案更新这个答案,在那里我可以测试代码。
我可以让我的 python 生产者使用 Schema-Registry 向 Kafka-Connect 发送消息:
...
import avro.datafile
import avro.io
import avro.schema
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='kafka:9092')
with open('schema.avsc') as f:
schema = avro.schema.Parse(f.read())
def post_message():
bytes_writer = io.BytesIO()
# Write the Confluent "Magic Byte"
bytes_writer.write(bytes([0]))
# Should get or create the schema version with Schema-Registry
...
schema_version = 1
bytes_writer.write(
int.to_bytes(schema_version, 4, byteorder='big'))
# and then the standard Avro bytes serialization
writer = avro.io.DatumWriter(schema)
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write({'key': 'value'}, encoder)
producer.send('topic', value=bytes_writer.getvalue())
关于 "Magic Byte" 的文档: https://github.com/confluentinc/schema-registry/blob/master/docs/serializer-formatter.rst