卡夫卡 + 阿夫罗 + Python
Kafka + Avro + Python
我想通过生产者发送消息并由消费者获取。它必须是 avro,但我不知道该怎么做。看一看:
schema = { "type":"record", "name":"myrecord", "fields": [{"name":"typ","type":"string"}, {"name":"pred","type":"int"}
producer = KafkaProducer(bootstrap_servers=['xxxx:xxxx'],value_serializer = avro.schema.parse(json.dumps(schema)))
for i in range(100):
message = {"typ":"sth","pred":i}
producer.send("xxxx", value=message)
你能帮我正确地做吗?
使用 kafka-python
,value_serializer
需要是 value 的函数,而不是解析的 Avro 模式。
例如
from avro.io import DatumWriter
schema_def = { ... }
schema = avro.schema.parse(json.dumps(schema_def).encode('utf-8'))
def serialize(value):
writer = DatumWriter()
# TODO: add schema to writer
# TODO: write value payload to writer
# TODO: return writer to bytes
producer = KafkaProducer(value_serializer=serialize)
这比您真正需要做的工作要多。请查看 confluent-kafka-python
示例代码 - https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py
像这样应该可以解决问题:
from kafka import KafkaProducer
import io
from avro.schema import Parse
from avro.io import DatumWriter, DatumReader, BinaryEncoder, BinaryDecoder
# Create a Kafka client ready to produce messages
producer = KafkaProducer(bootstrap_servers=bootstrap_address,
security_protocol="...", ...)
# Get the schema to use to serialize the message
schema = Parse(open(FILENAME_WHERE_YOU_HAVE_YOUR_AVRO_SCHEMA, "rb").read())
# serialize the message data using the schema
buf = io.BytesIO()
encoder = BinaryEncoder(buf)
writer = DatumWriter(writer_schema=schema)
writer.write(myobject, encoder)
buf.seek(0)
message_data = (buf.read())
# message key if needed
key = None
# headers if needed
headers = []
# Send the serialized message to the Kafka topic
producer.send(topicname,
message_data,
key,
headers)
producer.flush()
我想通过生产者发送消息并由消费者获取。它必须是 avro,但我不知道该怎么做。看一看:
schema = { "type":"record", "name":"myrecord", "fields": [{"name":"typ","type":"string"}, {"name":"pred","type":"int"}
producer = KafkaProducer(bootstrap_servers=['xxxx:xxxx'],value_serializer = avro.schema.parse(json.dumps(schema)))
for i in range(100):
message = {"typ":"sth","pred":i}
producer.send("xxxx", value=message)
你能帮我正确地做吗?
使用 kafka-python
,value_serializer
需要是 value 的函数,而不是解析的 Avro 模式。
例如
from avro.io import DatumWriter
schema_def = { ... }
schema = avro.schema.parse(json.dumps(schema_def).encode('utf-8'))
def serialize(value):
writer = DatumWriter()
# TODO: add schema to writer
# TODO: write value payload to writer
# TODO: return writer to bytes
producer = KafkaProducer(value_serializer=serialize)
这比您真正需要做的工作要多。请查看 confluent-kafka-python
示例代码 - https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py
像这样应该可以解决问题:
from kafka import KafkaProducer
import io
from avro.schema import Parse
from avro.io import DatumWriter, DatumReader, BinaryEncoder, BinaryDecoder
# Create a Kafka client ready to produce messages
producer = KafkaProducer(bootstrap_servers=bootstrap_address,
security_protocol="...", ...)
# Get the schema to use to serialize the message
schema = Parse(open(FILENAME_WHERE_YOU_HAVE_YOUR_AVRO_SCHEMA, "rb").read())
# serialize the message data using the schema
buf = io.BytesIO()
encoder = BinaryEncoder(buf)
writer = DatumWriter(writer_schema=schema)
writer.write(myobject, encoder)
buf.seek(0)
message_data = (buf.read())
# message key if needed
key = None
# headers if needed
headers = []
# Send the serialized message to the Kafka topic
producer.send(topicname,
message_data,
key,
headers)
producer.flush()