kafka-python 1.3.3: KafkaProducer.send with explicit key 无法向 broker 发送消息
kafka-python 1.3.3: KafkaProducer.send with explicit key fails to send message to broker
(可能 Can't send a keyedMessage to brokers with partitioner.class=kafka.producer.DefaultPartitioner 的副本,尽管该问题的 OP 没有提到 kafka-python。无论如何,它从来没有得到一个答案。)
我有一个 Python 程序已经成功(许多个月)使用以下逻辑向 Kafka 代理发送消息:
producer = kafka.KafkaProducer(bootstrap_servers=[some_addr],
retries=3)
...
msg = json.dumps(some_message)
res = producer.send(some_topic, value=msg)
最近,我尝试升级它,根据从消息中提取的确定键值将消息发送到不同的分区:
producer = kafka.KafkaProducer(bootstrap_servers=[some_addr],
key_serializer=str.encode,
retries=3)
...
try:
key = some_message[0]
except:
key = None
msg = json.dumps(some_message)
res = producer.send(some_topic, value=msg, key=key)
但是,使用此代码,没有 消息会从程序中发送到代理。我已验证从 some_message
中提取的键值始终是有效字符串。大概我不需要定义自己的 partitioner
,因为根据文档:
The default partitioner implementation hashes each non-None key using the same murmur2 algorithm as the java client so that messages with the same key are assigned to the same partition.
此外,使用新代码,当我尝试通过调用 res.get
(以获得 kafka.FutureRecordMetadata
)来确定我的 send
发生了什么时, 调用引发 TypeError
异常,消息为 descriptor 'encode' requires a 'str' object but received a 'unicode'
。
(作为附带问题,我不确定如果我真的能够得到它,我会用 FutureRecordMetadata
做什么。基于 kafka-python 源代码,我假设我想调用它的 succeeded
或它的 failed
方法,但文档在这一点上没有提及。文档 does 说return send
"resolves to" RecordMetadata
的值,但我无法从文档或代码中弄清楚 "resolves to" 在这个语境。)
无论如何:我不是唯一一个使用 kafka-python 1.3.3 的人,他曾尝试使用分区键发送消息,而且我在 Intertubes 上没有看到任何描述类似问题的内容(除了我在此 post 顶部引用的 SO 问题)。
我当然愿意相信我做错了什么,但我不知道那可能是什么。我需要向 KafkaProducer
构造函数提供一些额外的参数吗?
根本问题是我的键值是 unicode
,尽管我非常确信它是 str
。因此,为我的 key_serializer
选择 str.encode
是不合适的,并且是导致 res.get
例外的原因。省略 key_serializer
并调用 key.encode('utf-8')
足以发布我的消息,并按预期进行分区。
这个问题(对我来说)模糊不清的一个重要原因是 kafka-python 1.3.3 documentation 没有详细说明 FutureRecordMetadata
确实如此,也不应该以其 get
方法可以引发的异常方式期望什么。文档中唯一的用法示例:
# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
表明它会引发的唯一异常是 KafkaError
,这不是真的。事实上,get
可以并且将会(重新)引发 any 异步发布机制在尝试将消息发送出去时遇到的异常。
我也遇到了同样的错误。一旦我在发送密钥时添加了 json.dumps,它就起作用了。
producer.send(topic="first_topic", key=json.dumps(key)
.encode('utf-8'), value=json.dumps(msg)
.encode('utf-8'))
.add_callback(on_send_success).add_errback(on_send_error)
(可能 Can't send a keyedMessage to brokers with partitioner.class=kafka.producer.DefaultPartitioner 的副本,尽管该问题的 OP 没有提到 kafka-python。无论如何,它从来没有得到一个答案。)
我有一个 Python 程序已经成功(许多个月)使用以下逻辑向 Kafka 代理发送消息:
producer = kafka.KafkaProducer(bootstrap_servers=[some_addr],
retries=3)
...
msg = json.dumps(some_message)
res = producer.send(some_topic, value=msg)
最近,我尝试升级它,根据从消息中提取的确定键值将消息发送到不同的分区:
producer = kafka.KafkaProducer(bootstrap_servers=[some_addr],
key_serializer=str.encode,
retries=3)
...
try:
key = some_message[0]
except:
key = None
msg = json.dumps(some_message)
res = producer.send(some_topic, value=msg, key=key)
但是,使用此代码,没有 消息会从程序中发送到代理。我已验证从 some_message
中提取的键值始终是有效字符串。大概我不需要定义自己的 partitioner
,因为根据文档:
The default partitioner implementation hashes each non-None key using the same murmur2 algorithm as the java client so that messages with the same key are assigned to the same partition.
此外,使用新代码,当我尝试通过调用 res.get
(以获得 kafka.FutureRecordMetadata
)来确定我的 send
发生了什么时, 调用引发 TypeError
异常,消息为 descriptor 'encode' requires a 'str' object but received a 'unicode'
。
(作为附带问题,我不确定如果我真的能够得到它,我会用 FutureRecordMetadata
做什么。基于 kafka-python 源代码,我假设我想调用它的 succeeded
或它的 failed
方法,但文档在这一点上没有提及。文档 does 说return send
"resolves to" RecordMetadata
的值,但我无法从文档或代码中弄清楚 "resolves to" 在这个语境。)
无论如何:我不是唯一一个使用 kafka-python 1.3.3 的人,他曾尝试使用分区键发送消息,而且我在 Intertubes 上没有看到任何描述类似问题的内容(除了我在此 post 顶部引用的 SO 问题)。
我当然愿意相信我做错了什么,但我不知道那可能是什么。我需要向 KafkaProducer
构造函数提供一些额外的参数吗?
根本问题是我的键值是 unicode
,尽管我非常确信它是 str
。因此,为我的 key_serializer
选择 str.encode
是不合适的,并且是导致 res.get
例外的原因。省略 key_serializer
并调用 key.encode('utf-8')
足以发布我的消息,并按预期进行分区。
这个问题(对我来说)模糊不清的一个重要原因是 kafka-python 1.3.3 documentation 没有详细说明 FutureRecordMetadata
确实如此,也不应该以其 get
方法可以引发的异常方式期望什么。文档中唯一的用法示例:
# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
表明它会引发的唯一异常是 KafkaError
,这不是真的。事实上,get
可以并且将会(重新)引发 any 异步发布机制在尝试将消息发送出去时遇到的异常。
我也遇到了同样的错误。一旦我在发送密钥时添加了 json.dumps,它就起作用了。
producer.send(topic="first_topic", key=json.dumps(key)
.encode('utf-8'), value=json.dumps(msg)
.encode('utf-8'))
.add_callback(on_send_success).add_errback(on_send_error)