可以在控制台中使用 Kafka 消息但不能使用 Python 库吗?
Can consume Kafka messages in console but not with Python library?
我在我的笔记本电脑上进行本地操作,并尝试从远程服务器 'xxxxx' 读取主题 'test'。
使用控制台时,我启动了 zookeeper、Kafka,然后是消费者:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-console-consumer.sh --bootstrap-server xxxxx:9092 --topic test --from-beginning
消息将显示在控制台中。
但是当如下使用 Python 库时,我什么也没看到:
from kafka import KafkaConsumer
server = {'server': 'xxxxx:9092', 'topic': 'test'}
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(server['topic'],
group_id='my-group',
bootstrap_servers=server['server'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
我也可以在控制台中从 Kafka 本地成功地将消息发送到 Python Kafka 消费者,这个问题只在尝试使用远程消息时发生。
此外,与远程服务器的连接似乎建立良好(它可以看到我),但不幸的是什么都没有收到。
我找到的解决方案是使用另一个库 Confluent Kafka Python,这个库开箱即用,只需配置服务器 IP 和要收听的主题名称
编辑:这是我实施的解决方案:
我以为Avro库只是读取Avro文件,但它实际上解决了解码Kafka消息的问题,如下:我首先导入库并将模式文件作为参数,然后创建一个函数来解码将消息放入字典中,我可以在消费者循环中使用它。
from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
c = Consumer()
c.subscribe(topic)
running = True
while running:
msg = c.poll()
if not msg.error():
msg_value = msg.value()
event_dict = decode(msg_value)
print(event_dict)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
我在我的笔记本电脑上进行本地操作,并尝试从远程服务器 'xxxxx' 读取主题 'test'。 使用控制台时,我启动了 zookeeper、Kafka,然后是消费者:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-console-consumer.sh --bootstrap-server xxxxx:9092 --topic test --from-beginning
消息将显示在控制台中。 但是当如下使用 Python 库时,我什么也没看到:
from kafka import KafkaConsumer
server = {'server': 'xxxxx:9092', 'topic': 'test'}
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(server['topic'],
group_id='my-group',
bootstrap_servers=server['server'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
我也可以在控制台中从 Kafka 本地成功地将消息发送到 Python Kafka 消费者,这个问题只在尝试使用远程消息时发生。 此外,与远程服务器的连接似乎建立良好(它可以看到我),但不幸的是什么都没有收到。
我找到的解决方案是使用另一个库 Confluent Kafka Python,这个库开箱即用,只需配置服务器 IP 和要收听的主题名称
编辑:这是我实施的解决方案:
我以为Avro库只是读取Avro文件,但它实际上解决了解码Kafka消息的问题,如下:我首先导入库并将模式文件作为参数,然后创建一个函数来解码将消息放入字典中,我可以在消费者循环中使用它。
from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
c = Consumer()
c.subscribe(topic)
running = True
while running:
msg = c.poll()
if not msg.error():
msg_value = msg.value()
event_dict = decode(msg_value)
print(event_dict)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False