Flink KeyedProcessFunction 与广播状态
Flink KeyedProcessFunction vs Broadcast state
我尝试为我的 flink 应用程序使用广播状态模式,但经过一些研究后我做了以下操作:
case class MyData(field1: String, field2: String, ts: Long, type: String) // type can be DATA or CONFIG
val stream1: DataStream[MyData] = ... // kinesis queue 1. main Data stream
val stream2: DataStream[MyData] = ... // kinesis queue 2. configuration stream
val union = stream1.union(stream2)
.keyBy(x => s"${x.field1}_${x.field2}")
.process(new MyProcessFun)
我在 MyProcessFun()
中读取数据,并根据来自 stream2
的数据对数据进行一些逻辑处理并发出一些元素。
基本上,我使用 stream2
之类的广播状态模式。我没有专门使用广播,因为没有简单的方法来访问我从 processBroadcastElement
获得的某些状态。
由于我的配置流被用作清洁状态的指示器,因此我的 MyProcessFun()
.
流是 .keyBy
,所以我预计并行度 > 1 不会出现问题。这是真的吗?
我的问题是什么情况下还需要Broadcast?
什么情况下需要使用广播模式?
由于在许多情况下,可以借助 .union()
、.connect() // with no broadcast
+ {Co}ProcessFunction()
.
来解决此类功能
这里有两件事要说。
首先,您似乎可以使用标准 KeyedCoProcess
函数来实现您现在使用 union
所做的事情。它实际上并没有太大区别,但是您可以为两个流设置单独的 类,以便更好的类型安全性和更好的域隔离。
至于 broadcast
,主要用例是当 control
流没有 keyBy
的键或者根本无法对 t/shouldn 进行分区时.
举一个例子,你可能有一些外部系统生成的事件,你想应用规则来过滤掉不满足规则要求的事件。您希望拥有动态规则,以便如果用户定义规则,它将立即用于过滤传入事件。为简单起见,我们假设规则适用于所有事件类型(例如,如果事件发生在给定日期的下午 5 点之后,则应将其过滤掉,或者如果事件持续超过 5 分钟,我们假设它无效)。你不能划分这样的规则,所以解决方案是 broadcast
.
或者,如果您想拥有一个系统,您可以在 real-time 中计算 driver 秒的交付总收入。你可能有一套额外的奖金(比如 driver 一小时内送货 10 次,就有 5% 的奖金)。您不想为每个 driver 创建一套单独的奖励规则,这样您就可以 keyBy
它,您会吗?:)
我尝试为我的 flink 应用程序使用广播状态模式,但经过一些研究后我做了以下操作:
case class MyData(field1: String, field2: String, ts: Long, type: String) // type can be DATA or CONFIG
val stream1: DataStream[MyData] = ... // kinesis queue 1. main Data stream
val stream2: DataStream[MyData] = ... // kinesis queue 2. configuration stream
val union = stream1.union(stream2)
.keyBy(x => s"${x.field1}_${x.field2}")
.process(new MyProcessFun)
我在 MyProcessFun()
中读取数据,并根据来自 stream2
的数据对数据进行一些逻辑处理并发出一些元素。
基本上,我使用 stream2
之类的广播状态模式。我没有专门使用广播,因为没有简单的方法来访问我从 processBroadcastElement
获得的某些状态。
由于我的配置流被用作清洁状态的指示器,因此我的 MyProcessFun()
.
流是 .keyBy
,所以我预计并行度 > 1 不会出现问题。这是真的吗?
我的问题是什么情况下还需要Broadcast?
什么情况下需要使用广播模式?
由于在许多情况下,可以借助 .union()
、.connect() // with no broadcast
+ {Co}ProcessFunction()
.
这里有两件事要说。
首先,您似乎可以使用标准 KeyedCoProcess
函数来实现您现在使用 union
所做的事情。它实际上并没有太大区别,但是您可以为两个流设置单独的 类,以便更好的类型安全性和更好的域隔离。
至于 broadcast
,主要用例是当 control
流没有 keyBy
的键或者根本无法对 t/shouldn 进行分区时.
举一个例子,你可能有一些外部系统生成的事件,你想应用规则来过滤掉不满足规则要求的事件。您希望拥有动态规则,以便如果用户定义规则,它将立即用于过滤传入事件。为简单起见,我们假设规则适用于所有事件类型(例如,如果事件发生在给定日期的下午 5 点之后,则应将其过滤掉,或者如果事件持续超过 5 分钟,我们假设它无效)。你不能划分这样的规则,所以解决方案是 broadcast
.
或者,如果您想拥有一个系统,您可以在 real-time 中计算 driver 秒的交付总收入。你可能有一套额外的奖金(比如 driver 一小时内送货 10 次,就有 5% 的奖金)。您不想为每个 driver 创建一套单独的奖励规则,这样您就可以 keyBy
它,您会吗?:)