在 Python 中指示 group_id 时,Kafka 未收到消息
Kafka not receiving messages when indicating group_id in Python
我正在使用 Kafka (kafka-python
) 版本 3.0.0-1.3.0.0.p0.40。我需要为 Python 中的主题 'simulation' 配置消费者。当我不指示 group_id,即 group_id = None 时,它可以正常接收消息。但是,如果我指示 group_id,它不会收到任何消息。
这是我在 Python 中的代码:
consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
group_id = 'myTestGroupID', enable_auto_commit = True)
consumer.subscribe(['simulation'])
# not using assign method here as auto_commit is enabled
# partitions = [TopicPartition('simulation',num) for num in range(0,9)]
# consumer.assign([TopicPartition('simulation', partitions[0])])
while not self.stop_event.is_set():
for message in consumer:
print(message)
我试图在消费者属性文件中搜索 group_id 的一些默认值,我找到了一个 cloudera_mirrormaker 但没有任何改变。
我将需要使用多个消费者,因此我有一个 group_id 并且他们共享相同的 group_id 很重要。
在许多来源中,我发现 group_id 可以是任何字符串...
当我 运行 控制台中此主题的使用者时,它可以工作并接收消息
./kafka-console-consumer.sh --bootstrap-server XXX.XXX.XXX.XXX:9092 --topic simulation --from-beginning --consumer-property group.id=myTestGroupID --partition 0
当我运行宁 kafka-consumer-groups.sh 列出所有可用组时它是空的。
如果有人知道它卡在 Python 中的原因,我们将不胜感激。
非常感谢
这是生产者的代码(为了简单起见,我已经减少了它,因为在这种情况下它不会改变问题)
from kafka import KafkaProducer
class Producer(threading.Thread):
...
def run(self):
producer = KafkaProducer(bootstrap_servers='XXX.XXX.XXX.XXX:9092')
while not self.stop_event.is_set():
string = 'test %s' %time.time()
producer.send('simulation', string.encode())
time.sleep(0.5)
producer.close()
Question: Kafka not receiving messages when indicating group_id
Try, pass the 'topic' on KafkaConsumer
instantiating, like in the Documentation:
# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('simulation', group_id='myTestGroupID')
for msg in consumer:
print (msg)
The Documentation: KafkaConsumer is clear about the type of group-id
:
group_id (str or None) – The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None
我遇到了同样的问题,在处理高延迟环境和大消息 (>1Mb) 时部分收不到消息(大部分消息都丢失了)。
我没有花太多精力去寻找根本原因,但我的猜测是消费者重新平衡是在消息处理完成之前启动的,这似乎会在没有其他消费者可用时引起问题(在我的如果我有一个消费者或两个消费者遇到同样的问题)。
我的诀窍是增加 max_poll_interval_ms
并设置 max_poll_records=1
consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
group_id = 'myTestGroupID',
enable_auto_commit = True,
max_poll_interval_ms=5000,
max_poll_records=1)
您可以在以下位置找到更多信息:
https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
在检测消费者故障部分。
终于解决了
那是我的问题:omkafka
配置文件 partitions.number
属性默认为 1
。
我们根据需要将其更改为 100
,它开始工作了!希望对你有所帮助
我正在使用 Kafka (kafka-python
) 版本 3.0.0-1.3.0.0.p0.40。我需要为 Python 中的主题 'simulation' 配置消费者。当我不指示 group_id,即 group_id = None 时,它可以正常接收消息。但是,如果我指示 group_id,它不会收到任何消息。
这是我在 Python 中的代码:
consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
group_id = 'myTestGroupID', enable_auto_commit = True)
consumer.subscribe(['simulation'])
# not using assign method here as auto_commit is enabled
# partitions = [TopicPartition('simulation',num) for num in range(0,9)]
# consumer.assign([TopicPartition('simulation', partitions[0])])
while not self.stop_event.is_set():
for message in consumer:
print(message)
我试图在消费者属性文件中搜索 group_id 的一些默认值,我找到了一个 cloudera_mirrormaker 但没有任何改变。 我将需要使用多个消费者,因此我有一个 group_id 并且他们共享相同的 group_id 很重要。 在许多来源中,我发现 group_id 可以是任何字符串...
当我 运行 控制台中此主题的使用者时,它可以工作并接收消息
./kafka-console-consumer.sh --bootstrap-server XXX.XXX.XXX.XXX:9092 --topic simulation --from-beginning --consumer-property group.id=myTestGroupID --partition 0
当我运行宁 kafka-consumer-groups.sh 列出所有可用组时它是空的。
如果有人知道它卡在 Python 中的原因,我们将不胜感激。 非常感谢
这是生产者的代码(为了简单起见,我已经减少了它,因为在这种情况下它不会改变问题)
from kafka import KafkaProducer
class Producer(threading.Thread):
...
def run(self):
producer = KafkaProducer(bootstrap_servers='XXX.XXX.XXX.XXX:9092')
while not self.stop_event.is_set():
string = 'test %s' %time.time()
producer.send('simulation', string.encode())
time.sleep(0.5)
producer.close()
Question: Kafka not receiving messages when indicating group_id
Try, pass the 'topic' on
KafkaConsumer
instantiating, like in the Documentation:# join a consumer group for dynamic partition assignment and offset commits from kafka import KafkaConsumer consumer = KafkaConsumer('simulation', group_id='myTestGroupID') for msg in consumer: print (msg)
The Documentation: KafkaConsumer is clear about the type of
group-id
:group_id (str or None) – The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None
我遇到了同样的问题,在处理高延迟环境和大消息 (>1Mb) 时部分收不到消息(大部分消息都丢失了)。
我没有花太多精力去寻找根本原因,但我的猜测是消费者重新平衡是在消息处理完成之前启动的,这似乎会在没有其他消费者可用时引起问题(在我的如果我有一个消费者或两个消费者遇到同样的问题)。
我的诀窍是增加 max_poll_interval_ms 并设置 max_poll_records=1
consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
group_id = 'myTestGroupID',
enable_auto_commit = True,
max_poll_interval_ms=5000,
max_poll_records=1)
您可以在以下位置找到更多信息: https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html 在检测消费者故障部分。
终于解决了
那是我的问题:omkafka
配置文件 partitions.number
属性默认为 1
。
我们根据需要将其更改为 100
,它开始工作了!希望对你有所帮助