kafka-python 1.3.3: KafkaProducer.send with explicit key 无法向 broker 发送消息

kafka-python 1.3.3: KafkaProducer.send with explicit key fails to send message to broker

(可能 Can't send a keyedMessage to brokers with partitioner.class=kafka.producer.DefaultPartitioner 的副本,尽管该问题的 OP 没有提到 kafka-python。无论如何,它从来没有得到一个答案。)

我有一个 Python 程序已经成功(许多个月)使用以下逻辑向 Kafka 代理发送消息:

producer = kafka.KafkaProducer(bootstrap_servers=[some_addr],
                               retries=3)
...
msg = json.dumps(some_message)
res = producer.send(some_topic, value=msg)

最近,我尝试升级它,根据​​从消息中提取的确定键值将消息发送到不同的分区:

producer = kafka.KafkaProducer(bootstrap_servers=[some_addr],
                               key_serializer=str.encode,
                               retries=3)
...
try: 
    key = some_message[0]
except:
    key = None
msg = json.dumps(some_message)
res = producer.send(some_topic, value=msg, key=key)

但是,使用此代码,没有 消息会从程序中发送到代理。我已验证从 some_message 中提取的键值始终是有效字符串。大概我不需要定义自己的 partitioner,因为根据文档:

The default partitioner implementation hashes each non-None key using the same murmur2 algorithm as the java client so that messages with the same key are assigned to the same partition.

此外,使用新代码,当我尝试通过调用 res.get(以获得 kafka.FutureRecordMetadata)来确定我的 send 发生了什么时, 调用引发 TypeError 异常,消息为 descriptor 'encode' requires a 'str' object but received a 'unicode'

(作为附带问题,我不确定如果我真的能够得到它,我会用 FutureRecordMetadata 做什么。基于 kafka-python 源代码,我假设我想调用它的 succeeded 或它的 failed 方法,但文档在这一点上没有提及。文档 does 说return send "resolves to" RecordMetadata 的值,但我无法从文档或代码中弄清楚 "resolves to" 在这个语境。)

无论如何:我不是唯一一个使用 kafka-python 1.3.3 的人,他曾尝试使用分区键发送消息,而且我在 Intertubes 上没有看到任何描述类似问题的内容(除了我在此 post 顶部引用的 SO 问题)。

我当然愿意相信我做错了什么,但我不知道那可能是什么。我需要向 KafkaProducer 构造函数提供一些额外的参数吗?

根本问题是我的键值是 unicode,尽管我非常确信它是 str。因此,为我的 key_serializer 选择 str.encode 是不合适的,并且是导致 res.get 例外的原因。省略 key_serializer 并调用 key.encode('utf-8') 足以发布我的消息,并按预期进行分区。

这个问题(对我来说)模糊不清的一个重要原因是 kafka-python 1.3.3 documentation 没有详细说明 FutureRecordMetadata确实如此,也不应该以其 get 方法可以引发的异常方式期望什么。文档中唯一的用法示例:

# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')

# Block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

表明它会引发的唯一异常是 KafkaError这不是真的。事实上,get 可以并且将会(重新)引发 any 异步发布机制在尝试将消息发送出去时遇到的异常。

我也遇到了同样的错误。一旦我在发送密钥时添加了 json.dumps,它就起作用了。

producer.send(topic="first_topic", key=json.dumps(key)
.encode('utf-8'), value=json.dumps(msg)
.encode('utf-8'))
.add_callback(on_send_success).add_errback(on_send_error)