Kafka - 将消息分配给特定的消费者组
Kafka - Assign messages to specific Consumer Groups
我有一个关于 Kafka 组 ID 的小问题,我可以在 Java 中使用这个 Annotaiton:
@KafkaListener(topics = "insert", groupId = "user")
在那里我可以设置一个它想要使用的 groupId,但我不只使用这个组 ID 可能是因为我无法发送到特定的组 ID。我怎样才能只发送到一个特殊的groupid?为什么我可以使用 GroupID 或者我需要设置特定的主题来发送特定的 Kafka 消息?
我已经尝试在网上找到答案,但我一无所获,也许我使用 google false 哈哈
希望大家能理解我,不明白的请追问:)
非常感谢!
欢迎来到Kafka
!首先:您不能发送给消费者组,您 发送给 Topic
.
下面的文字太多。在尝试阅读整个答案时注意可能的困倦。
如果您仍在阅读本文,我假设您确实想知道如何将消息定向到特定客户,或者您真的需要尽快睡一觉。
也许两者都有。之后不要开车。
回到你的问题。
从该主题,多个 Consumer Groups
可以阅读。每个 CG
都是独立于其他人的,所以每个人都会自己从头到尾阅读主题。 将 CG
视为内向型消费者的联盟:他们不会关心其他群体,他们永远不会与其他群体交谈,他们甚至不知道是否存在其他群体。
我可以想到三种不同的方法来实现您的目标,通过使用不同的方法 and/or 体系结构。唯一使用 Consumer Groups
的是第一个,但其他两个也可能有帮助:
subscribe
assign
- 多个主题
前两个基于在单个主题内划分消息的机制。第三个应该只在某些情况下是合理的。让我们进入这些选项。
1. Subscribe and Consumer Groups
您可以创建一个新主题,用消息填充它,然后添加一些元数据以便识别谁需要处理每条消息(给谁消息被定向).
存储在 Kafka 中的消息包含 KEY
和 VALUE
(消息本身)。
假设您只想 GROUP-A
处理一些特定的消息。一种简单的解决方案是在密钥上包含一个标识符,例如 后缀 。您的其中一个密钥可能如下所示:key#GA
.
在消费者方面,您 poll()
来自该主题的消息,并在处理它之前添加一些额外的条件逻辑:您只需读取密钥并检查后缀。如果它对应于指定的消费者组,在本例中,它包含 GA
,那么来自 GROUP-A
的消费者知道它必须处理消息。
例如,您的主题存储两种不同性质的消息,您希望将它们定向到两个组:GROUP-A
和 GROUP-Z
。
key value
- [11#GA][MESSAGE]
- [21#GZ][MESSAGE]
- [33#GZ][MESSAGE]
- [44#GA][MESSAGE]
两个消费者组都会轮询这 4 条消息,但每个组只会处理其中的一些消息。
Group-A
将丢弃第 2 条和第 3 条消息。 它将处理第 1 和第 4 个。
Group-Z
将丢弃第 1 条和第 4 条消息。 它将处理第二个和第三个。
这基本上就是您的目标,但使用了一些额外的逻辑并使用了 Kafka 的架构。带有特定后缀的消息将被“定向”给特定的消费群体,而被其他消费群体忽略。
2. Assign
上述解决方案侧重于消费者群体和 Kafka 的 subscribe
方法论。另一种可能的解决方案是使用 Kafka 的 assign
方法,而不是 订阅消费者组 。这里不涉及 ConsumerGroup
,因此将引用前面的 组 以避免混淆。
Assign
允许您直接指定您的消费者必须从中读取的topic/partition。
在生产者方面,您应该使用自己的逻辑对消息进行分区,以便在主题内的分区之间划分它们。关于自定义分区器的一些更深入的信息 here ( 是的,来自 link 的作者似乎是个彻头彻尾的混蛋 )。
例如,假设您有 5 种不同类型 的 consumers
。因此,您创建了一个包含 5 个分区的 Topic
,每个“组”一个。您的 producer
的自定义分区程序为每条消息标识相应的分区,主题将在生成上一个示例的消息后呈现此结构:
为了将消息定向到其相应的 "组" :
"Group-Z"
分配给第 5 个分区。
"Group-A"
分配了第一个分区。
此解决方案的优势 是浪费的资源较少:每个“组”只轮询自己的消息,并且每条消息都经过验证以定向到消费者轮询它,您避免了 discard/accept 逻辑: 线路上的流量减少,内存中的对象减少,cpu 工作减少 .
缺点 包含一个更复杂的 Kafka 生产者机制,它涉及一个自定义分区程序,最肯定应该 不断 更新关于更改您的数据或主题结构。此外,每次更改生产者端时,这也会导致更新消费者的定义分配。
个人说明:
Assign
提供更好的性能,但代价高昂:对生产者、主题、分区和消费者进行手动和持续控制,因此(可能)更多容易出错。我将其称为 高效 解决方案。
Subscribe
使所有过程变得更加简单,并且可能涉及更少的系统 problems/error,因此更 可靠 。我将其称为 有效 解决方案。
总之,这是一个完全主观的意见。
还未完成
之前提出的解决方案假设消息具有相同的性质,因此将在相同的主题中生成。
为了解释我在这里要说的内容,假设一个 Topic 表示为一个存储建筑物。
<--laptops, tables, smartphones,...
之前的解决方案假设您在那里存储相似的元素,例如电子设备;它们的使用寿命相似,无论具体设备类型如何,存储方法都相似,您使用的机器是相同的等等。考虑到这一点,将所有这些元素存储在同一个仓库中是完全合乎逻辑的,分为不同的部分(分为同一主题,分为不同的分区)。
没有真正的理由为每个电子设备系列建立一个新仓库(一个用于电视,另一个用于手机,...... 除非你被钱包裹 ).之前的解决方案假设您的消息是不同类型的“电子设备”。
但是时间过去了,你做得很好,所以决定开始一项新业务:水果储存。
水果的寿命较短(log.retention.ms
有人吗?),必须在一定的温度范围内储存,并且您的设备存储元素和第一个仓库的技术可能会有很大不同。此外,您的水果业务可能会在一年中的某些时期关闭,而电子设备则 365/24 接收。即使你每天都打开你的设备仓库,也许水果仓库只在周一和周二工作(幸运的是不会因为这段时间而暂时关闭)。
由于水果和电子设备需要不同类型的存储管理,您决定新建一个仓库。你的新水果主题。
<--bananas, kiwis, apples, chicozapotes,...
在这里创建第二个主题是合理的,因为每个主题可能需要不同的配置值,并且每个主题都存储不同性质的内容。这导致消费者的处理逻辑也非常不同。
那么,这是第三种可能的解决方案吗?
好吧,它确实让你忘记了消费者组、分区机制、手动分配等。你只需要决定哪些消费者订阅哪个Topic,你就是完成:您有效地将消息定向到特定的消费者。
但是,如果你建一个仓库开始存放电脑,你真的会再建一个仓库来存放刚到的手机吗?在现实生活中,你必须支付建造第二栋楼的费用,以及支付两份税款,支付两栋大楼的清洁费,等等。
laptops here->
<-tablets here
在 kafka 的世界中,这将表示为 kafka 集群的额外工作(两次复制请求,zookeeper 有一个新的 ACL 和控制器的新生儿,......),分配给这项工作的人的额外时间, since now 负责管理两个主题:员工将时间花在可以避免的事情上是公司损失的 €€€ 的同义词。另外,我不知道他们是否已经这样做或将来会这样做,但是云提供商在某种程度上喜欢对某些操作征收少量税款,例如创建主题(但这只是一种可能性,我可能这里错了).
要恢复,这不一定是个坏主意:它只需要一个合理的上下文。如果您使用 Bananas 和 Qualcomm 芯片,请使用它。
如果您使用的是笔记本电脑和平板电脑,请选择之前显示的消费者组和分区解决方案。
我有一个关于 Kafka 组 ID 的小问题,我可以在 Java 中使用这个 Annotaiton:
@KafkaListener(topics = "insert", groupId = "user")
在那里我可以设置一个它想要使用的 groupId,但我不只使用这个组 ID 可能是因为我无法发送到特定的组 ID。我怎样才能只发送到一个特殊的groupid?为什么我可以使用 GroupID 或者我需要设置特定的主题来发送特定的 Kafka 消息?
我已经尝试在网上找到答案,但我一无所获,也许我使用 google false 哈哈
希望大家能理解我,不明白的请追问:)
非常感谢!
欢迎来到Kafka
!首先:您不能发送给消费者组,您 发送给 Topic
.
下面的文字太多。在尝试阅读整个答案时注意可能的困倦。
如果您仍在阅读本文,我假设您确实想知道如何将消息定向到特定客户,或者您真的需要尽快睡一觉。
也许两者都有。之后不要开车。
回到你的问题。
从该主题,多个 Consumer Groups
可以阅读。每个 CG
都是独立于其他人的,所以每个人都会自己从头到尾阅读主题。 将 CG
视为内向型消费者的联盟:他们不会关心其他群体,他们永远不会与其他群体交谈,他们甚至不知道是否存在其他群体。
我可以想到三种不同的方法来实现您的目标,通过使用不同的方法 and/or 体系结构。唯一使用 Consumer Groups
的是第一个,但其他两个也可能有帮助:
subscribe
assign
- 多个主题
前两个基于在单个主题内划分消息的机制。第三个应该只在某些情况下是合理的。让我们进入这些选项。
1. Subscribe and Consumer Groups
您可以创建一个新主题,用消息填充它,然后添加一些元数据以便识别谁需要处理每条消息(给谁消息被定向).
存储在 Kafka 中的消息包含 KEY
和 VALUE
(消息本身)。
假设您只想 GROUP-A
处理一些特定的消息。一种简单的解决方案是在密钥上包含一个标识符,例如 后缀 。您的其中一个密钥可能如下所示:key#GA
.
在消费者方面,您 poll()
来自该主题的消息,并在处理它之前添加一些额外的条件逻辑:您只需读取密钥并检查后缀。如果它对应于指定的消费者组,在本例中,它包含 GA
,那么来自 GROUP-A
的消费者知道它必须处理消息。
例如,您的主题存储两种不同性质的消息,您希望将它们定向到两个组:GROUP-A
和 GROUP-Z
。
key value
- [11#GA][MESSAGE]
- [21#GZ][MESSAGE]
- [33#GZ][MESSAGE]
- [44#GA][MESSAGE]
两个消费者组都会轮询这 4 条消息,但每个组只会处理其中的一些消息。
Group-A
将丢弃第 2 条和第 3 条消息。 它将处理第 1 和第 4 个。Group-Z
将丢弃第 1 条和第 4 条消息。 它将处理第二个和第三个。
这基本上就是您的目标,但使用了一些额外的逻辑并使用了 Kafka 的架构。带有特定后缀的消息将被“定向”给特定的消费群体,而被其他消费群体忽略。
2. Assign
上述解决方案侧重于消费者群体和 Kafka 的 subscribe
方法论。另一种可能的解决方案是使用 Kafka 的 assign
方法,而不是 订阅消费者组 。这里不涉及 ConsumerGroup
,因此将引用前面的 组 以避免混淆。
Assign
允许您直接指定您的消费者必须从中读取的topic/partition。
在生产者方面,您应该使用自己的逻辑对消息进行分区,以便在主题内的分区之间划分它们。关于自定义分区器的一些更深入的信息 here ( 是的,来自 link 的作者似乎是个彻头彻尾的混蛋 )。
例如,假设您有 5 种不同类型 的 consumers
。因此,您创建了一个包含 5 个分区的 Topic
,每个“组”一个。您的 producer
的自定义分区程序为每条消息标识相应的分区,主题将在生成上一个示例的消息后呈现此结构:
为了将消息定向到其相应的 "组" :
"Group-Z"
分配给第 5 个分区。"Group-A"
分配了第一个分区。
此解决方案的优势 是浪费的资源较少:每个“组”只轮询自己的消息,并且每条消息都经过验证以定向到消费者轮询它,您避免了 discard/accept 逻辑: 线路上的流量减少,内存中的对象减少,cpu 工作减少 .
缺点 包含一个更复杂的 Kafka 生产者机制,它涉及一个自定义分区程序,最肯定应该 不断 更新关于更改您的数据或主题结构。此外,每次更改生产者端时,这也会导致更新消费者的定义分配。
个人说明:
Assign
提供更好的性能,但代价高昂:对生产者、主题、分区和消费者进行手动和持续控制,因此(可能)更多容易出错。我将其称为 高效 解决方案。
Subscribe
使所有过程变得更加简单,并且可能涉及更少的系统 problems/error,因此更 可靠 。我将其称为 有效 解决方案。
总之,这是一个完全主观的意见。
还未完成
之前提出的解决方案假设消息具有相同的性质,因此将在相同的主题中生成。
为了解释我在这里要说的内容,假设一个 Topic 表示为一个存储建筑物。
<--laptops, tables, smartphones,...
之前的解决方案假设您在那里存储相似的元素,例如电子设备;它们的使用寿命相似,无论具体设备类型如何,存储方法都相似,您使用的机器是相同的等等。考虑到这一点,将所有这些元素存储在同一个仓库中是完全合乎逻辑的,分为不同的部分(分为同一主题,分为不同的分区)。
没有真正的理由为每个电子设备系列建立一个新仓库(一个用于电视,另一个用于手机,...... 除非你被钱包裹 ).之前的解决方案假设您的消息是不同类型的“电子设备”。
但是时间过去了,你做得很好,所以决定开始一项新业务:水果储存。
水果的寿命较短(log.retention.ms
有人吗?),必须在一定的温度范围内储存,并且您的设备存储元素和第一个仓库的技术可能会有很大不同。此外,您的水果业务可能会在一年中的某些时期关闭,而电子设备则 365/24 接收。即使你每天都打开你的设备仓库,也许水果仓库只在周一和周二工作(幸运的是不会因为这段时间而暂时关闭)。
由于水果和电子设备需要不同类型的存储管理,您决定新建一个仓库。你的新水果主题。
<--bananas, kiwis, apples, chicozapotes,...
在这里创建第二个主题是合理的,因为每个主题可能需要不同的配置值,并且每个主题都存储不同性质的内容。这导致消费者的处理逻辑也非常不同。
那么,这是第三种可能的解决方案吗?
好吧,它确实让你忘记了消费者组、分区机制、手动分配等。你只需要决定哪些消费者订阅哪个Topic,你就是完成:您有效地将消息定向到特定的消费者。
但是,如果你建一个仓库开始存放电脑,你真的会再建一个仓库来存放刚到的手机吗?在现实生活中,你必须支付建造第二栋楼的费用,以及支付两份税款,支付两栋大楼的清洁费,等等。
laptops here->
<-tablets here
在 kafka 的世界中,这将表示为 kafka 集群的额外工作(两次复制请求,zookeeper 有一个新的 ACL 和控制器的新生儿,......),分配给这项工作的人的额外时间, since now 负责管理两个主题:员工将时间花在可以避免的事情上是公司损失的 €€€ 的同义词。另外,我不知道他们是否已经这样做或将来会这样做,但是云提供商在某种程度上喜欢对某些操作征收少量税款,例如创建主题(但这只是一种可能性,我可能这里错了).
要恢复,这不一定是个坏主意:它只需要一个合理的上下文。如果您使用 Bananas 和 Qualcomm 芯片,请使用它。
如果您使用的是笔记本电脑和平板电脑,请选择之前显示的消费者组和分区解决方案。