如果将 group_id 设置为 None,Kafka 消费者会收到消息,但如果不是 None,它不会收到任何消息?

Kafka consumer receives message if set group_id to None, but it doesn't receive any message if not None?

我有以下 Kafka 消费者,如果将 group_id 分配给 None,它运行良好 - 它收到了所有历史消息和我新测试的消息。

consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset=auto_offset_reset,
        enable_auto_commit=enable_auto_commit,
        group_id=group_id,
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

for m in consumer:

但是,如果我将 group_id 设置为某个值,它不会收到任何东西。我尝试 运行 测试生产者发送新消息,但没有收到任何消息。

消费者控制台确实显示以下消息:

2020-11-07 00:56:01 INFO     ThreadPoolExecutor-0_0 base.py (Re-)joining group my_group
2020-11-07 00:56:07 INFO     ThreadPoolExecutor-0_0 base.py Successfully joined group my_group with generation 497
2020-11-07 00:56:07 INFO     ThreadPoolExecutor-0_0 subscription_state.py Updated partition assignment: []
2020-11-07 00:56:07 INFO     ThreadPoolExecutor-0_0 consumer.py Setting newly assigned partitions set() for group my_group

一个主题的一个分区只能由同一个消费者组中的一个消费者消费。

如果您不设置 group.id,KafkaConsumer 将为您生成一个新的随机 group.id。由于此 group.id 是唯一的,您将看到正在消耗数据。

如果你有多个消费者 运行 相同 group.id,只有一个消费者会读取数据,而另一个则保持空闲状态不消费任何东西。

我知道,这不是解决作者问题的方法。尽管如此,如果您降落在这里,您可能会出于其他原因遇到此问题。和我一样。

因此,至少对于 kafka-python v2.0.2 和 Aiven Kafka 代理设置,通过添加 consumer.poll() 的空调用解决了问题。 这特别奇怪,因为当没有分配 group_id 时不需要这样做。

输出自:

def get():
    for message in consumer:
        print(message.value)
    consumer.commit()

在这种情况下什么都不是

虽然下面按预期工作。它只读出来自上次提交的新消息():

输出自:

def get():
    consumer.poll()
    for message in consumer:
        print(message.value)
    consumer.commit()

它按预期输出了自上次提交以来该主题中的所有消息

仅供参考,class 构造函数如下所示:

    consumer = KafkaConsumer(
        topics,
        bootstrap_servers=self._service_uri,
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        client_id='my_consumer_name',
        group_id=self.GROUP_ID,
        security_protocol="SSL",
        ssl_cafile=self._ca_path,
        ssl_certfile=self._cert_path,
        ssl_keyfile=self._key_path,
    )

¯\_(ツ)_/¯