相同键控主题无法加入 Kafka 流
Identically Keyed Topics Failing to Join in Kafka Streams
我最近在 streams 应用程序中遇到了一个我以前 运行 没有遇到过的问题,很难找到与 keying/joins 相关的问题(更新后,分区).
我有两个主题(raw_events 和 processed_users),它们的键控相同,但是当我尝试对这两个主题执行连接时,只有 一些 的连接是成功的,尽管键控相同。
工作流程(上下文)
为简洁起见,应用程序的基本工作流程如下:
- 数据通过生产者流入
raw_event
主题。
- 一系列流应用程序侦听
raw_event
主题并根据一系列业务规则(例如 IP 地址、用户等)从中提取各种实体
- 从
raw_event
主题中识别的实体被放入 preprocessing_{type}
主题中,其中包含有关提取的元数据和在 raw_event
中找到的相关信息(例如,对于用户可能是帐户名、电子邮件等)。 这些主题中的项目由 raw_event
键控。
- 另一系列流应用程序将收听各种
preprocessing_{type}
主题,并根据一系列 GlobalKTables 将这些主题加入其中,这些 GlobalKTables 代表给定实体 final_{type}
的所有已知实例。对于成功的加入,来自 final_{type}
的实例将使用来自 raw_event/preprocessing_{type}
主题的新信息进行丰富;不成功的连接将指示给定类型的新实体,然后将其键入并放入 final_{type}
主题中。 preprocessing_{type}
的所有丰富实例都插入到 processing_{type}
主题中,该主题包含实体的丰富(或新)实例以及创建它的元数据。最重要的是 - processed_{type}
主题中的项目仍然由 raw_event
键入。
- 最后,一个流应用程序 运行s 并尝试通过加入
processing_{type}
来丰富 raw_event
的原始实例,这将是相同的键控,并丰富 raw_event
包含来自丰富实体的各种信息的实例,然后将其推送到 final_event
主题。
问题
问题本身出现在上面的第 5 步(事件丰富)中,因为 raw_event
主题和 processing_users
主题之间只有一些连接按预期工作。
使用通过整个管道的 24 条记录的子集,主题中的 24 对中只有 5 对成功加入。那些有效的似乎是相同的一致的,但我没有在数据中看到任何可以表明为什么一个有效而另一个无效的内容:
raw_event keys processing_user keys
mawjuG0B9k3AiALz0_2S 0q0juG0B9k3AiALz8ApP
xEEcv20B9k3AiALzEN0m m60juG0B9k3AiALz5gU5
zqwjuG0B9k3AiALzz_tg ua0juG0B9k3AiALz7wqa
v60juG0B9k3AiALz6Aal xEEcv20B9k3AiALzEN0m
0q0juG0B9k3AiALz8ApP zqwjuG0B9k3AiALzz_tg
RK0juG0B9k3AiALz5QUw zK0juG0B9k3AiALz6Aal
0a0juG0B9k3AiALz6Aal Ta0juG0B9k3AiALz5QUw
8KwjuG0B9k3AiALz1v58 RKwjuG0B9k3AiALz1P7C
c60juG0B9k3AiALz5gU4 -60juG0B9k3AiALz3gGn
RKwjuG0B9k3AiALz1P7C Va0juG0B9k3AiALz5QUw
zK0juG0B9k3AiALz6Aal 560juG0B9k3AiALz3QGh
Ta0juG0B9k3AiALz5QUw mawjuG0B9k3AiALz0_2S
Va0juG0B9k3AiALz5QUw -K0juG0B9k3AiALz3QGh
pK0juG0B9k3AiALz5gU5 zq0juG0B9k3AiALz6Aal
Xa0juG0B9k3AiALz2QCh RK0juG0B9k3AiALz5QUw
560juG0B9k3AiALz3QGh v60juG0B9k3AiALz6Aal
-K0juG0B9k3AiALz3QGh Xa0juG0B9k3AiALz2QCh
-60juG0B9k3AiALz3gGn P60juG0B9k3AiALz5QUw
F60juG0B9k3AiALz3gKn pK0juG0B9k3AiALz5gU5
m60juG0B9k3AiALz5gU5 0a0juG0B9k3AiALz6Aal
zq0juG0B9k3AiALz6Aal 3K0juG0B9k3AiALz3QGh
ua0juG0B9k3AiALz7wqa 8KwjuG0B9k3AiALz1v58
3K0juG0B9k3AiALz3QGh F60juG0B9k3AiALz3gKn
P60juG0B9k3AiALz5QUw c60juG0B9k3AiALz5gU4
我已经尝试将主题作为 KStreams 和 KTables 进行组合(以及我能想到的每一种组合),但是在这个小子集中的 24 条消息中,只有大约 5 条连接成功.
当前代码的当前示例(和略微简化):
val events = streams.createKTable<RawEvent>("raw_events)
val users = streams.createKStream<ProcessingUser>("processing_users)
val finalEvents = events
.join(users, eventsProcessor::enrichWithUsers)
.to("final_events")
鉴于 raw_events
和 processing_users
主题中存在对应的对 (1:1),是否可以解释为什么某些连接会成功而其他连接会失败?只有 5 对会始终如一地进入 final_events
主题(总是相同的对)。
欢迎任何额外的建议!
配置
为了详细起见,以下是有关设置的一些注意事项:
- 使用卡夫卡流 2.3.0
- 对于所有适用的物化调用,缓存和日志记录分别是 enabled/disabled
- 拓扑优化已启用
- 缓存缓冲设置为 0
更新
在花了几个小时仔细考虑并深入研究数据后,简而言之,这个问题似乎与分区有关。
五个一直成功的连接,只是看起来如此,因为键位于每个主题的相同分区上:
successful events raw_events partition processing_users partition
RK0juG0B9k3AiALz5QUw 3 3
m60juG0B9k3AiALz5gU5 7 7
ua0juG0B9k3AiALz7wqa 7 7
8KwjuG0B9k3AiALz1v58 8 8
RKwjuG0B9k3AiALz1P7C 9 9
尽管所有键都存在于两个主题中,但它们似乎并没有使用相同的策略进行分区(即两个主题 包含 所有具有相同键的消息,但有些可能出现在 raw_events
中的一个分区上,但在 processing_users
中出现在不同的分区上)如下面的 partition/count 表示所示:
值得强调的是,出现在 raw_events
主题中的消息是在上述流应用程序工作流之外生成的,这使我相信需要回答这些问题:
- 是否可以允许分区策略的责任完全落在流工作流的入口点上,假设它会导致跨分区的规范化分布? (例如,如果给定键在
raw_events
的分区 7 中,并且您将具有相同键的记录发送到 preprocessing_users
,它会落入分区 7?
- 如果是这样,这是一个合理的策略吗?或者有没有一种方法可以在不编写所有生产者和流应用程序使用的自定义分区程序的情况下强制执行此行为?
- 如果不是,是否可以采用现有主题(在这种情况下
raw_event
并基本上对整个主题重新分区,以便使用默认分区策略?
如对原始 post 的更新中所述,问题本身是 .NET Producer 应用程序之间分区策略差异的结果,默认情况下使用 consistent_random
分区策略,与使用 murmur2random
策略的默认 Java 流应用程序相反。
有几个选项可以解决这个问题,但在这种特殊情况下,最简单的方法是调整生产者以使用适当的策略:
// Set the default partitioning strategy
ProducerConfig.Partitioner = Partitioner.Murmur2Random;
另一种方法可能是编写一个 CustomPartitioner
class 来实现您首选的分区策略以模仿您的生产者。
我最近在 streams 应用程序中遇到了一个我以前 运行 没有遇到过的问题,很难找到与 keying/joins 相关的问题(更新后,分区).
我有两个主题(raw_events 和 processed_users),它们的键控相同,但是当我尝试对这两个主题执行连接时,只有 一些 的连接是成功的,尽管键控相同。
工作流程(上下文)
为简洁起见,应用程序的基本工作流程如下:
- 数据通过生产者流入
raw_event
主题。 - 一系列流应用程序侦听
raw_event
主题并根据一系列业务规则(例如 IP 地址、用户等)从中提取各种实体 - 从
raw_event
主题中识别的实体被放入preprocessing_{type}
主题中,其中包含有关提取的元数据和在raw_event
中找到的相关信息(例如,对于用户可能是帐户名、电子邮件等)。 这些主题中的项目由raw_event
键控。 - 另一系列流应用程序将收听各种
preprocessing_{type}
主题,并根据一系列 GlobalKTables 将这些主题加入其中,这些 GlobalKTables 代表给定实体final_{type}
的所有已知实例。对于成功的加入,来自final_{type}
的实例将使用来自raw_event/preprocessing_{type}
主题的新信息进行丰富;不成功的连接将指示给定类型的新实体,然后将其键入并放入final_{type}
主题中。preprocessing_{type}
的所有丰富实例都插入到processing_{type}
主题中,该主题包含实体的丰富(或新)实例以及创建它的元数据。最重要的是 -processed_{type}
主题中的项目仍然由raw_event
键入。 - 最后,一个流应用程序 运行s 并尝试通过加入
processing_{type}
来丰富raw_event
的原始实例,这将是相同的键控,并丰富raw_event
包含来自丰富实体的各种信息的实例,然后将其推送到final_event
主题。
问题
问题本身出现在上面的第 5 步(事件丰富)中,因为 raw_event
主题和 processing_users
主题之间只有一些连接按预期工作。
使用通过整个管道的 24 条记录的子集,主题中的 24 对中只有 5 对成功加入。那些有效的似乎是相同的一致的,但我没有在数据中看到任何可以表明为什么一个有效而另一个无效的内容:
raw_event keys processing_user keys
mawjuG0B9k3AiALz0_2S 0q0juG0B9k3AiALz8ApP
xEEcv20B9k3AiALzEN0m m60juG0B9k3AiALz5gU5
zqwjuG0B9k3AiALzz_tg ua0juG0B9k3AiALz7wqa
v60juG0B9k3AiALz6Aal xEEcv20B9k3AiALzEN0m
0q0juG0B9k3AiALz8ApP zqwjuG0B9k3AiALzz_tg
RK0juG0B9k3AiALz5QUw zK0juG0B9k3AiALz6Aal
0a0juG0B9k3AiALz6Aal Ta0juG0B9k3AiALz5QUw
8KwjuG0B9k3AiALz1v58 RKwjuG0B9k3AiALz1P7C
c60juG0B9k3AiALz5gU4 -60juG0B9k3AiALz3gGn
RKwjuG0B9k3AiALz1P7C Va0juG0B9k3AiALz5QUw
zK0juG0B9k3AiALz6Aal 560juG0B9k3AiALz3QGh
Ta0juG0B9k3AiALz5QUw mawjuG0B9k3AiALz0_2S
Va0juG0B9k3AiALz5QUw -K0juG0B9k3AiALz3QGh
pK0juG0B9k3AiALz5gU5 zq0juG0B9k3AiALz6Aal
Xa0juG0B9k3AiALz2QCh RK0juG0B9k3AiALz5QUw
560juG0B9k3AiALz3QGh v60juG0B9k3AiALz6Aal
-K0juG0B9k3AiALz3QGh Xa0juG0B9k3AiALz2QCh
-60juG0B9k3AiALz3gGn P60juG0B9k3AiALz5QUw
F60juG0B9k3AiALz3gKn pK0juG0B9k3AiALz5gU5
m60juG0B9k3AiALz5gU5 0a0juG0B9k3AiALz6Aal
zq0juG0B9k3AiALz6Aal 3K0juG0B9k3AiALz3QGh
ua0juG0B9k3AiALz7wqa 8KwjuG0B9k3AiALz1v58
3K0juG0B9k3AiALz3QGh F60juG0B9k3AiALz3gKn
P60juG0B9k3AiALz5QUw c60juG0B9k3AiALz5gU4
我已经尝试将主题作为 KStreams 和 KTables 进行组合(以及我能想到的每一种组合),但是在这个小子集中的 24 条消息中,只有大约 5 条连接成功.
当前代码的当前示例(和略微简化):
val events = streams.createKTable<RawEvent>("raw_events)
val users = streams.createKStream<ProcessingUser>("processing_users)
val finalEvents = events
.join(users, eventsProcessor::enrichWithUsers)
.to("final_events")
鉴于 raw_events
和 processing_users
主题中存在对应的对 (1:1),是否可以解释为什么某些连接会成功而其他连接会失败?只有 5 对会始终如一地进入 final_events
主题(总是相同的对)。
欢迎任何额外的建议!
配置
为了详细起见,以下是有关设置的一些注意事项:
- 使用卡夫卡流 2.3.0
- 对于所有适用的物化调用,缓存和日志记录分别是 enabled/disabled
- 拓扑优化已启用
- 缓存缓冲设置为 0
更新
在花了几个小时仔细考虑并深入研究数据后,简而言之,这个问题似乎与分区有关。
五个一直成功的连接,只是看起来如此,因为键位于每个主题的相同分区上:
successful events raw_events partition processing_users partition
RK0juG0B9k3AiALz5QUw 3 3
m60juG0B9k3AiALz5gU5 7 7
ua0juG0B9k3AiALz7wqa 7 7
8KwjuG0B9k3AiALz1v58 8 8
RKwjuG0B9k3AiALz1P7C 9 9
尽管所有键都存在于两个主题中,但它们似乎并没有使用相同的策略进行分区(即两个主题 包含 所有具有相同键的消息,但有些可能出现在 raw_events
中的一个分区上,但在 processing_users
中出现在不同的分区上)如下面的 partition/count 表示所示:
值得强调的是,出现在 raw_events
主题中的消息是在上述流应用程序工作流之外生成的,这使我相信需要回答这些问题:
- 是否可以允许分区策略的责任完全落在流工作流的入口点上,假设它会导致跨分区的规范化分布? (例如,如果给定键在
raw_events
的分区 7 中,并且您将具有相同键的记录发送到preprocessing_users
,它会落入分区 7? - 如果是这样,这是一个合理的策略吗?或者有没有一种方法可以在不编写所有生产者和流应用程序使用的自定义分区程序的情况下强制执行此行为?
- 如果不是,是否可以采用现有主题(在这种情况下
raw_event
并基本上对整个主题重新分区,以便使用默认分区策略?
如对原始 post 的更新中所述,问题本身是 .NET Producer 应用程序之间分区策略差异的结果,默认情况下使用 consistent_random
分区策略,与使用 murmur2random
策略的默认 Java 流应用程序相反。
有几个选项可以解决这个问题,但在这种特殊情况下,最简单的方法是调整生产者以使用适当的策略:
// Set the default partitioning strategy
ProducerConfig.Partitioner = Partitioner.Murmur2Random;
另一种方法可能是编写一个 CustomPartitioner
class 来实现您首选的分区策略以模仿您的生产者。