Kafka 消息密钥是 None
Kafka message key is None
这是我的 Kafka 生产者和消费者的实现:
async def produce(topic_name):
"""Produces data into the Kafka Topic"""
p = Producer({"bootstrap.servers": BROKER_URL})
curr_iteration = 0
while True:
p.produce(topic_name, f"iteration {curr_iteration}".encode("utf-8"))
curr_iteration += 1
await asyncio.sleep(0.5)
async def consume(topic_name):
"""Consumes data from the Kafka Topic"""
c = Consumer({"bootstrap.servers": BROKER_URL, "group.id": "0"})
c.subscribe([topic_name])
while True:
message = c.poll(1.0)
if message is None:
print("no message received by consumer")
elif message.error() is not None:
print(f"error from consumer {message.error()}")
else:
print(f"consumed message {message.key()}: {message.value()}")
await asyncio.sleep(2.5)
message.key()
是None
,控制台输出如下:
consumed message None: b'iteration 1'
consumed message None: b'iteration 8'
consumed message None: b'iteration 12'
consumed message None: b'iteration 15'
如何更新代码以获取消息密钥?
像下面这样更新你的生产函数,
p.produce(topic, key="key", value="value")
To initiate sending a message to Kafka, call the produce
method,
passing in the message value (which may be None
) and optionally a key
, partition, and callback. The produce call will complete immediately
and does not return a value. A KafkaException will be thrown if the
message could not be enqueued due to librdkafka’s local produce queue
being full.
这是我的 Kafka 生产者和消费者的实现:
async def produce(topic_name):
"""Produces data into the Kafka Topic"""
p = Producer({"bootstrap.servers": BROKER_URL})
curr_iteration = 0
while True:
p.produce(topic_name, f"iteration {curr_iteration}".encode("utf-8"))
curr_iteration += 1
await asyncio.sleep(0.5)
async def consume(topic_name):
"""Consumes data from the Kafka Topic"""
c = Consumer({"bootstrap.servers": BROKER_URL, "group.id": "0"})
c.subscribe([topic_name])
while True:
message = c.poll(1.0)
if message is None:
print("no message received by consumer")
elif message.error() is not None:
print(f"error from consumer {message.error()}")
else:
print(f"consumed message {message.key()}: {message.value()}")
await asyncio.sleep(2.5)
message.key()
是None
,控制台输出如下:
consumed message None: b'iteration 1'
consumed message None: b'iteration 8'
consumed message None: b'iteration 12'
consumed message None: b'iteration 15'
如何更新代码以获取消息密钥?
像下面这样更新你的生产函数,
p.produce(topic, key="key", value="value")
To initiate sending a message to Kafka, call the
produce
method, passing in the message value (which may beNone
) andoptionally a key
, partition, and callback. The produce call will complete immediately and does not return a value. A KafkaException will be thrown if the message could not be enqueued due to librdkafka’s local produce queue being full.