Flink:广播操作员链接
Flink: Broadcasted Operator chaining
假设我有一个 events 的数据流,我想将它广播到一个(丰富的)地图运算符(map1),它链接到另一个(丰富的)地图运算符(地图 2)。两张地图的平行度是一样的。我想要的是 map1 的每个并行实例的输出转到 map2 的一个并行实例(即,两个映射之间没有广播)。这是我到目前为止所做的,但我不确定它在逻辑上是否正确。可以吗?
val trainedStream = events.broadcast.map(new Mapper1(...)).setParallelism(par)
trainedStream.startNewChain.map(new Mapper2(...)).setParallelism(par)
后续问题:map1 和 map2 的两个链式 subtasks/parallel 实例的 SubtaskIndex(从 RuntimeContext.getIndexOfThisSubtask 收到)是否相同?有办法检查吗?
代码在 Scala 中,但同样适用于 Java 我猜
只要有可能,链接就会在 Flink 中自动发生。因此,在您的示例中,仅使用
就足够了
val trainedStream = events.broadcast.map(new Mapper1(...)).map(new Mapper2(...))
然后我会在 env
上设置并行度。
顺便问一下,您确定要直播这些活动吗?默认情况下并行处理 Datastream
。广播事件非常不寻常,因为它们会根据并行度被处理多次。
Followup Question: Is the SubtaskIndex (received from RuntimeContext.getIndexOfThisSubtask) of two chained subtasks/parallel instances of map1 and map2 the same? Is there a way to check this?
链式运算符的子任务索引相同,因为它们位于同一任务中(因此它们甚至不能有不同的索引)。如果您有任务 mapper1 -> mapper2
.
,您可以看到链接成功
假设我有一个 events 的数据流,我想将它广播到一个(丰富的)地图运算符(map1),它链接到另一个(丰富的)地图运算符(地图 2)。两张地图的平行度是一样的。我想要的是 map1 的每个并行实例的输出转到 map2 的一个并行实例(即,两个映射之间没有广播)。这是我到目前为止所做的,但我不确定它在逻辑上是否正确。可以吗?
val trainedStream = events.broadcast.map(new Mapper1(...)).setParallelism(par)
trainedStream.startNewChain.map(new Mapper2(...)).setParallelism(par)
后续问题:map1 和 map2 的两个链式 subtasks/parallel 实例的 SubtaskIndex(从 RuntimeContext.getIndexOfThisSubtask 收到)是否相同?有办法检查吗?
代码在 Scala 中,但同样适用于 Java 我猜
只要有可能,链接就会在 Flink 中自动发生。因此,在您的示例中,仅使用
就足够了val trainedStream = events.broadcast.map(new Mapper1(...)).map(new Mapper2(...))
然后我会在 env
上设置并行度。
顺便问一下,您确定要直播这些活动吗?默认情况下并行处理 Datastream
。广播事件非常不寻常,因为它们会根据并行度被处理多次。
Followup Question: Is the SubtaskIndex (received from RuntimeContext.getIndexOfThisSubtask) of two chained subtasks/parallel instances of map1 and map2 the same? Is there a way to check this?
链式运算符的子任务索引相同,因为它们位于同一任务中(因此它们甚至不能有不同的索引)。如果您有任务 mapper1 -> mapper2
.