在 Flink 中以 (X,Y) 为键的缓慢变化流丰富以 (X,Y) 为键的快速流

Enrich fast stream keyed by (X,Y) with a slowly change stream keyed by (X) in Flink

我需要用由 (userId, startTripTimestamp) 键入的快速变化 streamA 和由 (userId) 键入的缓慢变化 streamB 来丰富我的内容。

我使用 Flink 1.8 和 DataStream API。我考虑两种方法:

  1. 广播 streamB 并按用户 ID 和最近的时间戳加入流。它是否等同于 TableAPI 中的 DynamicTable?我可以看到此解决方案的一些缺点:streamB 需要适合每个工作节点的 RAM,它增加了整个 RAM 的利用率 streamB 需要存储在每个工作节点的 RAM 中。

  2. streamA 的状态概括为仅由 (userId) 键控的流,我们将其命名为 streamC,以便与 streamB 具有公共键。然后,我可以将 streamCstreamB 合并,按处理时间排序,并处理状态中的两种类型的事件。处理生成的流(处理函数中的更多代码)更复杂,但不会消耗那么多 RAM 以在所有节点上拥有所有 streamB。这个解决方案还有什么缺点或优点吗?

我也看到了这个提案https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API,里面说:

In general, most of these follow the pattern of joining a main stream of high throughput with one or several inputs of slowly changing or static data:

[...]

Join stream with slowly evolving data: This is very similar to the above case but the side input that we use for enriching is evolving over time. This can be done by waiting for some initial data to be available before processing the main input and the continuously ingesting new data into the internal side input structure as it arrives.

不幸的是,要实现此功能似乎还需要很长时间 https://issues.apache.org/jira/browse/FLINK-6131,并且没有描述任何替代方案。因此,我想问一下目前针对所描述的用例推荐的方法。

我看过,但它没有指定流的键是什么,而且在Flink 1.4的时候已经回答了,所以我预计推荐的解决方案可能已经改变了。

答案取决于您 streamB 状态的大小,需要用来丰富 streamA

  • 如果您广播您的 streamB 状态,那么您会将 streamB 中的所有用户 ID 放入每个任务管理器。任务管理器上的每个任务将只有来自 streamA 的这些 userIds 的一个子集。因此,streamB 中的某些 userId 数据将永远不会被使用,并且会被浪费掉。因此,如果您认为 streamB 状态的大小不足以真正影响您的工作并且不会占用大量内存来为状态管理留出较少的内存,则可以保留整个 streamB 状态。这是你的#1。
  • 如果您的 streamB 状态真的很大并且会消耗任务管理器上的大量内存,您应该考虑方法 #2。 KeyBy 相同的 Id 两个流以确保具有相同 userID 的元素到达相同的任务,然后您可以使用托管状态来维护每个键的 streamB 状态并使用此托管状态丰富 streamA 元素。

基于 Gaurav Kumar 已经回答的内容。

主要问题是您需要完全匹配来自 streamAstreamB 的记录还是尽力匹配?例如,这对你来说是个问题吗,由于竞争条件,来自 streamA 的一些(很多?)记录可以在来自 streamB 的一些更新到达之前被处理,例如在启动期间?

我建议从 Table API is solving this issue 中汲取灵感。可能 Temporal Table Join 是您的正确选择,这会让您做出选择:处理时间还是事件时间?

Gaurav Kumar 的两个提案都是实现 of processing time Temporal Table joins,它假定记录可以非常松散地连接并且不必正确计时。

如果来自 streamAstreamB 的记录必须正确计时,那么您必须以某种方式缓冲来自两个流的一些记录。有多种方法可以做到这一点,具体取决于 semantic 您想要实现的目标。确定之后,实际实现起来并不难,可以从Table API join operators (org.apache.flink.table.runtime.join package in flink-table-planner module)中得到启发。

辅助输入(您引用的)and/or 输入选择只是用于控制不必要的缓冲记录数量的工具。您可以在没有它们的情况下实现有效的 Flink 作业,但如果一个流显着超过另一个流(就事件时间而言 - 对于处理时间而言,这不是问题),则内存消耗可能难以控制。