如果将 group_id 设置为 None,Kafka 消费者会收到消息,但如果不是 None,它不会收到任何消息?
Kafka consumer receives message if set group_id to None, but it doesn't receive any message if not None?
我有以下 Kafka 消费者,如果将 group_id
分配给 None,它运行良好 - 它收到了所有历史消息和我新测试的消息。
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
group_id=group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for m in consumer:
但是,如果我将 group_id
设置为某个值,它不会收到任何东西。我尝试 运行 测试生产者发送新消息,但没有收到任何消息。
消费者控制台确实显示以下消息:
2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py (Re-)joining group my_group
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py Successfully joined group my_group with generation 497
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py Updated partition assignment: []
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 consumer.py Setting newly assigned partitions set() for group my_group
一个主题的一个分区只能由同一个消费者组中的一个消费者消费。
如果您不设置 group.id,KafkaConsumer 将为您生成一个新的随机 group.id。由于此 group.id 是唯一的,您将看到正在消耗数据。
如果你有多个消费者 运行 相同 group.id,只有一个消费者会读取数据,而另一个则保持空闲状态不消费任何东西。
我知道,这不是解决作者问题的方法。尽管如此,如果您降落在这里,您可能会出于其他原因遇到此问题。和我一样。
因此,至少对于 kafka-python v2.0.2 和 Aiven Kafka 代理设置,通过添加 consumer.poll() 的空调用解决了问题。
这特别奇怪,因为当没有分配 group_id 时不需要这样做。
输出自:
def get():
for message in consumer:
print(message.value)
consumer.commit()
在这种情况下什么都不是
虽然下面按预期工作。它只读出来自上次提交的新消息():
输出自:
def get():
consumer.poll()
for message in consumer:
print(message.value)
consumer.commit()
它按预期输出了自上次提交以来该主题中的所有消息
仅供参考,class 构造函数如下所示:
consumer = KafkaConsumer(
topics,
bootstrap_servers=self._service_uri,
auto_offset_reset='earliest',
enable_auto_commit=False,
client_id='my_consumer_name',
group_id=self.GROUP_ID,
security_protocol="SSL",
ssl_cafile=self._ca_path,
ssl_certfile=self._cert_path,
ssl_keyfile=self._key_path,
)
¯\_(ツ)_/¯
我有以下 Kafka 消费者,如果将 group_id
分配给 None,它运行良好 - 它收到了所有历史消息和我新测试的消息。
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
group_id=group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for m in consumer:
但是,如果我将 group_id
设置为某个值,它不会收到任何东西。我尝试 运行 测试生产者发送新消息,但没有收到任何消息。
消费者控制台确实显示以下消息:
2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py (Re-)joining group my_group 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py Successfully joined group my_group with generation 497 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py Updated partition assignment: [] 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 consumer.py Setting newly assigned partitions set() for group my_group
一个主题的一个分区只能由同一个消费者组中的一个消费者消费。
如果您不设置 group.id,KafkaConsumer 将为您生成一个新的随机 group.id。由于此 group.id 是唯一的,您将看到正在消耗数据。
如果你有多个消费者 运行 相同 group.id,只有一个消费者会读取数据,而另一个则保持空闲状态不消费任何东西。
我知道,这不是解决作者问题的方法。尽管如此,如果您降落在这里,您可能会出于其他原因遇到此问题。和我一样。
因此,至少对于 kafka-python v2.0.2 和 Aiven Kafka 代理设置,通过添加 consumer.poll() 的空调用解决了问题。 这特别奇怪,因为当没有分配 group_id 时不需要这样做。
输出自:
def get():
for message in consumer:
print(message.value)
consumer.commit()
在这种情况下什么都不是
虽然下面按预期工作。它只读出来自上次提交的新消息():
输出自:
def get():
consumer.poll()
for message in consumer:
print(message.value)
consumer.commit()
它按预期输出了自上次提交以来该主题中的所有消息
仅供参考,class 构造函数如下所示:
consumer = KafkaConsumer(
topics,
bootstrap_servers=self._service_uri,
auto_offset_reset='earliest',
enable_auto_commit=False,
client_id='my_consumer_name',
group_id=self.GROUP_ID,
security_protocol="SSL",
ssl_cafile=self._ca_path,
ssl_certfile=self._cert_path,
ssl_keyfile=self._key_path,
)
¯\_(ツ)_/¯