相同键控主题无法加入 Kafka 流

Identically Keyed Topics Failing to Join in Kafka Streams

我最近在 streams 应用程序中遇到了一个我以前 运行 没有遇到过的问题,很难找到与 keying/joins 相关的问题(更新后,分区).

我有两个主题(raw_events 和 processed_users),它们的键控相同,但是当我尝试对这两个主题执行连接时,只有 一些 的连接是成功的,尽管键控相同。

工作流程(上下文)

为简洁起见,应用程序的基本工作流程如下:

  1. 数据通过生产者流入 raw_event 主题。
  2. 一系列流应用程序侦听 raw_event 主题并根据一系列业务规则(例如 IP 地址、用户等)从中提取各种实体
  3. raw_event 主题中识别的实体被放入 preprocessing_{type} 主题中,其中包含有关提取的元数据和在 raw_event 中找到的相关信息(例如,对于用户可能是帐户名、电子邮件等)。 这些主题中的项目由 raw_event 键控。
  4. 另一系列流应用程序将收听各种 preprocessing_{type} 主题,并根据一系列 GlobalKTables 将这些主题加入其中,这些 GlobalKTables 代表给定实体 final_{type} 的所有已知实例。对于成功的加入,来自 final_{type} 的实例将使用来自 raw_event/preprocessing_{type} 主题的新信息进行丰富;不成功的连接将指示给定类型的新实体,然后将其键入并放入 final_{type} 主题中。 preprocessing_{type} 的所有丰富实例都插入到 processing_{type} 主题中,该主题包含实体的丰富(或新)实例以及创建它的元数据。最重要的是 - processed_{type} 主题中的项目仍然由 raw_event 键入。
  5. 最后,一个流应用程序 运行s 并尝试通过加入 processing_{type} 来丰富 raw_event 的原始实例,这将是相同的键控,并丰富 raw_event 包含来自丰富实体的各​​种信息的实例,然后将其推送到 final_event 主题。

问题

问题本身出现在上面的第 5 步(事件丰富)中,因为 raw_event 主题和 processing_users 主题之间只有一些连接按预期工作。

使用通过整个管道的 24 条记录的子集,主题中的 24 对中只有 5 对成功加入。那些有效的似乎是相同的一致的,但我没有在数据中看到任何可以表明为什么一个有效而另一个无效的内容:

raw_event keys          processing_user keys
mawjuG0B9k3AiALz0_2S    0q0juG0B9k3AiALz8ApP 
xEEcv20B9k3AiALzEN0m    m60juG0B9k3AiALz5gU5 
zqwjuG0B9k3AiALzz_tg    ua0juG0B9k3AiALz7wqa 
v60juG0B9k3AiALz6Aal    xEEcv20B9k3AiALzEN0m 
0q0juG0B9k3AiALz8ApP    zqwjuG0B9k3AiALzz_tg 
RK0juG0B9k3AiALz5QUw    zK0juG0B9k3AiALz6Aal
0a0juG0B9k3AiALz6Aal    Ta0juG0B9k3AiALz5QUw 
8KwjuG0B9k3AiALz1v58    RKwjuG0B9k3AiALz1P7C 
c60juG0B9k3AiALz5gU4    -60juG0B9k3AiALz3gGn 
RKwjuG0B9k3AiALz1P7C    Va0juG0B9k3AiALz5QUw 
zK0juG0B9k3AiALz6Aal    560juG0B9k3AiALz3QGh 
Ta0juG0B9k3AiALz5QUw    mawjuG0B9k3AiALz0_2S 
Va0juG0B9k3AiALz5QUw    -K0juG0B9k3AiALz3QGh 
pK0juG0B9k3AiALz5gU5    zq0juG0B9k3AiALz6Aal 
Xa0juG0B9k3AiALz2QCh    RK0juG0B9k3AiALz5QUw 
560juG0B9k3AiALz3QGh    v60juG0B9k3AiALz6Aal 
-K0juG0B9k3AiALz3QGh    Xa0juG0B9k3AiALz2QCh 
-60juG0B9k3AiALz3gGn    P60juG0B9k3AiALz5QUw 
F60juG0B9k3AiALz3gKn    pK0juG0B9k3AiALz5gU5 
m60juG0B9k3AiALz5gU5    0a0juG0B9k3AiALz6Aal 
zq0juG0B9k3AiALz6Aal    3K0juG0B9k3AiALz3QGh 
ua0juG0B9k3AiALz7wqa    8KwjuG0B9k3AiALz1v58 
3K0juG0B9k3AiALz3QGh    F60juG0B9k3AiALz3gKn 
P60juG0B9k3AiALz5QUw    c60juG0B9k3AiALz5gU4 

我已经尝试将主题作为 KStreams 和 KTables 进行组合(以及我能想到的每一种组合),但是在这个小子集中的 24 条消息中,只有大约 5 条连接成功.

当前代码的当前示例(和略微简化):

val events = streams.createKTable<RawEvent>("raw_events)
val users = streams.createKStream<ProcessingUser>("processing_users)

val finalEvents = events
    .join(users, eventsProcessor::enrichWithUsers)
    .to("final_events")

鉴于 raw_eventsprocessing_users 主题中存在对应的对 (1:1),是否可以解释为什么某些连接会成功而其他连接会失败?只有 5 对会始终如一地进入 final_events 主题(总是相同的对)。

欢迎任何额外的建议!

配置

为了详细起见,以下是有关设置的一些注意事项:

更新

在花了几个小时仔细考虑并深入研究数据后,简而言之,这个问题似乎与分区有关。

五个一直成功的连接,只是看起来如此,因为键位于每个主题的相同分区上:

successful events       raw_events partition  processing_users partition
RK0juG0B9k3AiALz5QUw    3                     3
m60juG0B9k3AiALz5gU5    7                     7
ua0juG0B9k3AiALz7wqa    7                     7
8KwjuG0B9k3AiALz1v58    8                     8
RKwjuG0B9k3AiALz1P7C    9                     9

尽管所有键都存在于两个主题中,但它们似乎并没有使用相同的策略进行分区(即两个主题 包含 所有具有相同键的消息,但有些可能出现在 raw_events 中的一个分区上,但在 processing_users 中出现在不同的分区上)如下面的 partition/count 表示所示:

值得强调的是,出现在 raw_events 主题中的消息是在上述流应用程序工作流之外生成的,这使我相信需要回答这些问题:

如对原始 post 的更新中所述,问题本身是 .NET Producer 应用程序之间分区策略差异的结果,默认情况下使用 consistent_random 分区策略,与使用 murmur2random 策略的默认 Java 流应用程序相反。

有几个选项可以解决这个问题,但在这种特殊情况下,最简单的方法是调整生产者以使用适当的策略:

// Set the default partitioning strategy 
ProducerConfig.Partitioner = Partitioner.Murmur2Random;

另一种方法可能是编写一个 CustomPartitioner class 来实现您首选的分区策略以模仿您的生产者。