从 Kafka 轮询几条消息
Poll several messages from Kafka
我正在使用 confluent_kafka 软件包来处理 Kafka。
我是这样创建话题的:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
def my_producer():
bootstrap_servers=['my_adress.com:9092',
'my_adress.com:9092']
value_schema = avro.load('/home/ValueSchema.avsc')
avroProducer = AvroProducer({
'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
'schema.registry.url':'http://my_adress.com:8081',
},
default_value_schema=value_schema
)
for i in range(0, 25000):
value = {"name":"Yuva","favorite_number":10,"favorite_color":"green","age":i*2}
avroProducer.produce(topic='my_topik14', value=value)
avroProducer.flush(0)
print('Finished!')
if __name__ == '__main__':
my_producer()
有效。 (顺便说一下,这会收到 24820 条消息,而不是 25000 条……)
我们可以检查一下:
kafka-run-class kafka.tools.GetOffsetShell --broker-list my_adress.com:9092 --topic my_topik14
my_topik14:0:24819
现在我要消费:
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
bootstrap_servers=['my_adress.com:9092',
'my_adress.com:9092']
c = AvroConsumer(
{'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
'group.id': 'avroneversleeps',
'schema.registry.url': 'http://my_adress.com:8081',
'api.version.request': True,
'fetch.min.bytes': 100000,
'consume.callback.max.messages':1000,
'batch.num.messages':2
})
c.subscribe(['my_topik14'])
running = True
while running:
msg = None
try:
msg = c.poll(0.1)
if msg:
if not msg.error():
print(msg.value())
c.commit(msg)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
else:
print("No Message!! Happily trying again!!")
except SerializerError as e:
print("Message deserialization failed for %s: %s" % (msg, e))
running = False
c.commit()
c.close()
但是有个问题:
我只是一条一条地阅读消息。
我的问题是如何读取批量消息?
我在 Consumer config 中尝试了不同的参数,但它们没有任何变化!
我还在 SO 上找到 this question 并尝试了相同的参数 - 它仍然不起作用。
另请阅读此内容。但这违背了之前的link...
您可以使用 consume([num_messages=1][, timeout=-1])
方法来完成。 API 参考。这里:
对于 AvroConsumer:
https://docs.confluent.io/current/clients/confluent-kafka-python/index.html?highlight=avroconsumer#confluent_kafka.Consumer.consume
有关此问题的更多信息:
https://github.com/confluentinc/confluent-kafka-python/issues/252
AvroConsumer 没有 consume
方法。但是很容易让我自己实现这个方法,就像在 Consume class(AvroConsumer 的父级)中一样。
这是代码:
def consume_batch(self, num_messages=1, timeout=None):
"""
This is an overriden method from confluent_kafka.Consumer class. This handles batch of message
deserialization using avro schema
:param int num_messages: number of messages to read in one batch (default=1)
:param float timeout: Poll timeout in seconds (default: indefinite)
:returns: list of messages objects with deserialized key and value as dict objects
:rtype: Message
"""
messages_out = []
if timeout is None:
timeout = -1
messages = super(AvroConsumer, self).consume(num_messages=num_messages, timeout=timeout)
if messages is None:
return None
else:
for m in messages:
if not m.value() and not m.key():
return messages
if not m.error():
if m.value() is not None:
decoded_value = self._serializer.decode_message(m.value())
m.set_value(decoded_value)
if m.key() is not None:
decoded_key = self._serializer.decode_message(m.key())
m.set_key(decoded_key)
messages_out.append(m)
#print(len(message))
return messages_out
但是在那之后我们运行 测试并且这个方法没有任何性能提升。所以看起来只是为了更好的可用性。或者我需要做一些额外的工作来序列化整个批次而不是单个消息。
我正在使用 confluent_kafka 软件包来处理 Kafka。 我是这样创建话题的:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
def my_producer():
bootstrap_servers=['my_adress.com:9092',
'my_adress.com:9092']
value_schema = avro.load('/home/ValueSchema.avsc')
avroProducer = AvroProducer({
'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
'schema.registry.url':'http://my_adress.com:8081',
},
default_value_schema=value_schema
)
for i in range(0, 25000):
value = {"name":"Yuva","favorite_number":10,"favorite_color":"green","age":i*2}
avroProducer.produce(topic='my_topik14', value=value)
avroProducer.flush(0)
print('Finished!')
if __name__ == '__main__':
my_producer()
有效。 (顺便说一下,这会收到 24820 条消息,而不是 25000 条……) 我们可以检查一下:
kafka-run-class kafka.tools.GetOffsetShell --broker-list my_adress.com:9092 --topic my_topik14
my_topik14:0:24819
现在我要消费:
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
bootstrap_servers=['my_adress.com:9092',
'my_adress.com:9092']
c = AvroConsumer(
{'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
'group.id': 'avroneversleeps',
'schema.registry.url': 'http://my_adress.com:8081',
'api.version.request': True,
'fetch.min.bytes': 100000,
'consume.callback.max.messages':1000,
'batch.num.messages':2
})
c.subscribe(['my_topik14'])
running = True
while running:
msg = None
try:
msg = c.poll(0.1)
if msg:
if not msg.error():
print(msg.value())
c.commit(msg)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
else:
print("No Message!! Happily trying again!!")
except SerializerError as e:
print("Message deserialization failed for %s: %s" % (msg, e))
running = False
c.commit()
c.close()
但是有个问题: 我只是一条一条地阅读消息。 我的问题是如何读取批量消息? 我在 Consumer config 中尝试了不同的参数,但它们没有任何变化!
我还在 SO 上找到 this question 并尝试了相同的参数 - 它仍然不起作用。
另请阅读此内容。但这违背了之前的link...
您可以使用 consume([num_messages=1][, timeout=-1])
方法来完成。 API 参考。这里:
对于 AvroConsumer: https://docs.confluent.io/current/clients/confluent-kafka-python/index.html?highlight=avroconsumer#confluent_kafka.Consumer.consume
有关此问题的更多信息:
https://github.com/confluentinc/confluent-kafka-python/issues/252
AvroConsumer 没有 consume
方法。但是很容易让我自己实现这个方法,就像在 Consume class(AvroConsumer 的父级)中一样。
这是代码:
def consume_batch(self, num_messages=1, timeout=None):
"""
This is an overriden method from confluent_kafka.Consumer class. This handles batch of message
deserialization using avro schema
:param int num_messages: number of messages to read in one batch (default=1)
:param float timeout: Poll timeout in seconds (default: indefinite)
:returns: list of messages objects with deserialized key and value as dict objects
:rtype: Message
"""
messages_out = []
if timeout is None:
timeout = -1
messages = super(AvroConsumer, self).consume(num_messages=num_messages, timeout=timeout)
if messages is None:
return None
else:
for m in messages:
if not m.value() and not m.key():
return messages
if not m.error():
if m.value() is not None:
decoded_value = self._serializer.decode_message(m.value())
m.set_value(decoded_value)
if m.key() is not None:
decoded_key = self._serializer.decode_message(m.key())
m.set_key(decoded_key)
messages_out.append(m)
#print(len(message))
return messages_out
但是在那之后我们运行 测试并且这个方法没有任何性能提升。所以看起来只是为了更好的可用性。或者我需要做一些额外的工作来序列化整个批次而不是单个消息。