Kafka 消息密钥是 None

Kafka message key is None

这是我的 Kafka 生产者和消费者的实现:

async def produce(topic_name):
    """Produces data into the Kafka Topic"""
    p = Producer({"bootstrap.servers": BROKER_URL})

    curr_iteration = 0
    while True:
        p.produce(topic_name, f"iteration {curr_iteration}".encode("utf-8"))
        curr_iteration += 1
        await asyncio.sleep(0.5)


async def consume(topic_name):
    """Consumes data from the Kafka Topic"""
    c = Consumer({"bootstrap.servers": BROKER_URL, "group.id": "0"})
    c.subscribe([topic_name])
    while True:
        message = c.poll(1.0)
        if message is None:
            print("no message received by consumer")
        elif message.error() is not None:
            print(f"error from consumer {message.error()}")
        else:
            print(f"consumed message {message.key()}: {message.value()}")
        await asyncio.sleep(2.5)

message.key()None,控制台输出如下:

consumed message None: b'iteration 1'
consumed message None: b'iteration 8'
consumed message None: b'iteration 12'
consumed message None: b'iteration 15'

如何更新代码以获取消息密钥?

像下面这样更新你的生产函数,

p.produce(topic, key="key", value="value")

To initiate sending a message to Kafka, call the produce method, passing in the message value (which may be None) and optionally a key, partition, and callback. The produce call will complete immediately and does not return a value. A KafkaException will be thrown if the message could not be enqueued due to librdkafka’s local produce queue being full.

来源:- clients-confluent-kafka-python