kafka 是如何决定哪个消费者读取单个消费者组中的消息的?
How does kafka decide which consumer reads a message within a single consumer group?
我想知道是否有任何逻辑可以确定哪个消费者在同一消费者组中阅读消息。我有一个主题和一个消费者组。但是,我有一个或多个消费者,因为在生产环境中部署了一个消费者,当我 运行 在本地创建我的应用程序时,会创建另一个订阅相同主题的消费者(这是一个测试项目,所以它不是真正的生产环境,我'我不担心数据丢失)。我注意到有趣的是,本地消费者总是消费任何给定的消息。所以看起来后面创建的消费者优先。
是否可以配置 kafka,使较早创建的消费者优先读取?
我的设置包括 3 个代理和 1 个消费者组 ID。另外这个 property auto.offset.reset
is set to earliest
(changin it to latest
doesn't resolve the issue). I'm using this Go library for kafka。这是我的设置代码:
import (
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func getConfig() *kafka.ConfigMap {
return &kafka.ConfigMap{
"metadata.broker.list": conf.KafkaBrokers,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-256",
"sasl.username": conf.KafkaUsername,
"sasl.password": conf.KafkaPassword,
"group.id": conf.KafkaGroupID,
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
//"debug": "generic,broker,security",
}
}
在一个消费者组中,每个分区由一个消费者消费。当消费者加入该组时,其中一个计算由每个消费者将处理的分区列表组成的分配。
在您的客户端中,可以通过 partition.assignment.strategy
. This defaults to range
which follows the implementation of Apache Kafka's RangeAssignor
进行配置。
引用 Javadoc:
The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition.
For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
The assignment will be:
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
消费者按其在经纪人端生成的会员 ID 排序。它基于消费者 client.id
和随机 UUID。
实际上,每个分区分配给哪个消费者并不重要,因此我不会过多关注该部分。相反,重要的是了解分区的分配方式并确定最适合您的用例的策略。
为了完整性,confluent-kafka-go
还支持其他策略,例如:roundrobin
和 cooperative-sticky
。
我想知道是否有任何逻辑可以确定哪个消费者在同一消费者组中阅读消息。我有一个主题和一个消费者组。但是,我有一个或多个消费者,因为在生产环境中部署了一个消费者,当我 运行 在本地创建我的应用程序时,会创建另一个订阅相同主题的消费者(这是一个测试项目,所以它不是真正的生产环境,我'我不担心数据丢失)。我注意到有趣的是,本地消费者总是消费任何给定的消息。所以看起来后面创建的消费者优先。
是否可以配置 kafka,使较早创建的消费者优先读取?
我的设置包括 3 个代理和 1 个消费者组 ID。另外这个 property auto.offset.reset
is set to earliest
(changin it to latest
doesn't resolve the issue). I'm using this Go library for kafka。这是我的设置代码:
import (
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func getConfig() *kafka.ConfigMap {
return &kafka.ConfigMap{
"metadata.broker.list": conf.KafkaBrokers,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-256",
"sasl.username": conf.KafkaUsername,
"sasl.password": conf.KafkaPassword,
"group.id": conf.KafkaGroupID,
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
//"debug": "generic,broker,security",
}
}
在一个消费者组中,每个分区由一个消费者消费。当消费者加入该组时,其中一个计算由每个消费者将处理的分区列表组成的分配。
在您的客户端中,可以通过 partition.assignment.strategy
. This defaults to range
which follows the implementation of Apache Kafka's RangeAssignor
进行配置。
引用 Javadoc:
The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition.
For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
The assignment will be:
C0: [t0p0, t0p1, t1p0, t1p1] C1: [t0p2, t1p2]
消费者按其在经纪人端生成的会员 ID 排序。它基于消费者 client.id
和随机 UUID。
实际上,每个分区分配给哪个消费者并不重要,因此我不会过多关注该部分。相反,重要的是了解分区的分配方式并确定最适合您的用例的策略。
为了完整性,confluent-kafka-go
还支持其他策略,例如:roundrobin
和 cooperative-sticky
。