在 Flink Job 中传递两个流与 MainStreams 进行操作
Passing two streams to do operations with MainStreams in Flink Job
在Flink-Job中,目前我有两个流,一个是Kafka主题每分钟更新一次的主数据流,另一个是KeyedBroadcastProcessFunction的流程元素函数中使用的流(广播流),用于与主流数据进行一些计算.
现在我有一个新的需求,需要再添加一个与其他两个流在结构上完全不同的流。
1) Flink state必须要有的第三个stream怎么才能和主数据、broadcast state数据一起做计算?在 keyedBroadcastProcess 函数中?
2) 主数据可以有两个广播流吗?
3) 由于流数据是完全不同的数据,广播和第三数据流不会更频繁地更改,因此加入将不起作用。它类似于主数据,与主数据流一起用于计算
找不到任何解决方案,请帮助。请分享一些我可以参考的链接。
Flink 不提供任何类型的三输入过程函数。
您可以将两个广播流结合在一起(在广播它们之前)。我很欣赏它们是非常不同的类型,但是您总能找到使它们共存的方法。如果没有更自然的方法来统一这两种类型,您可以为此使用 Either 。要将两个不同的类型联合到一个流中,您可以这样做:
DataStream<String> strings = env.fromElements("one", "two", "three");
DataStream<Integer> ints = env.fromElements(1, 2, 3);
DataStream<Either<String, Integer>> stringsOnTheLeft = strings
.map(new MapFunction<String, Either<String, Integer>>() {
@Override
public Either<String, Integer> map(String s) throws Exception {
return Either.Left(s);
}
});
DataStream<Either<String, Integer>> intsOnTheRight = ints
.map(new MapFunction<Integer, Either<String, Integer>>() {
@Override
public Either<String, Integer> map(Integer i) throws Exception {
return Either.Right(i);
}
});
DataStream<Either<String, Integer>> stringsAndInts = stringsOnTheLeft.union(intsOnTheRight);
或者,如果您可以在不同的阶段将广播流应用于主流,那么您可以拥有两个 KeyedBroadcastProcessFunctions 的序列,其中一个的输出馈入另一个:
events
.keyBy(x -> x.foo)
.connect(broadcast1)
.process(new process1())
.keyBy(x -> x.foo)
.connect(broadcast2)
.process(new process2())
更新:
If we merge like this and broadcast, if any update comes to anyone
stream will update the broadcast state or it will create a new entry
in the broadcast state?
这完全在您的控制之下。广播状态始终是地图状态;我想您会选择某种直接的键来使用,所以您会得到类似 MapState<String, Either<T1, T2>>
的东西。映射状态像任何哈希图一样工作:如果你重复使用一个键,它将替换条目,如果你引入一个新键,它将创建一个新条目。
... how can [I] provide a key common to these to [broadcast] streams?
密钥不必相同,只需是相同类型即可。
在Flink-Job中,目前我有两个流,一个是Kafka主题每分钟更新一次的主数据流,另一个是KeyedBroadcastProcessFunction的流程元素函数中使用的流(广播流),用于与主流数据进行一些计算.
现在我有一个新的需求,需要再添加一个与其他两个流在结构上完全不同的流。
1) Flink state必须要有的第三个stream怎么才能和主数据、broadcast state数据一起做计算?在 keyedBroadcastProcess 函数中?
2) 主数据可以有两个广播流吗?
3) 由于流数据是完全不同的数据,广播和第三数据流不会更频繁地更改,因此加入将不起作用。它类似于主数据,与主数据流一起用于计算 找不到任何解决方案,请帮助。请分享一些我可以参考的链接。
Flink 不提供任何类型的三输入过程函数。
您可以将两个广播流结合在一起(在广播它们之前)。我很欣赏它们是非常不同的类型,但是您总能找到使它们共存的方法。如果没有更自然的方法来统一这两种类型,您可以为此使用 Either 。要将两个不同的类型联合到一个流中,您可以这样做:
DataStream<String> strings = env.fromElements("one", "two", "three");
DataStream<Integer> ints = env.fromElements(1, 2, 3);
DataStream<Either<String, Integer>> stringsOnTheLeft = strings
.map(new MapFunction<String, Either<String, Integer>>() {
@Override
public Either<String, Integer> map(String s) throws Exception {
return Either.Left(s);
}
});
DataStream<Either<String, Integer>> intsOnTheRight = ints
.map(new MapFunction<Integer, Either<String, Integer>>() {
@Override
public Either<String, Integer> map(Integer i) throws Exception {
return Either.Right(i);
}
});
DataStream<Either<String, Integer>> stringsAndInts = stringsOnTheLeft.union(intsOnTheRight);
或者,如果您可以在不同的阶段将广播流应用于主流,那么您可以拥有两个 KeyedBroadcastProcessFunctions 的序列,其中一个的输出馈入另一个:
events
.keyBy(x -> x.foo)
.connect(broadcast1)
.process(new process1())
.keyBy(x -> x.foo)
.connect(broadcast2)
.process(new process2())
更新:
If we merge like this and broadcast, if any update comes to anyone stream will update the broadcast state or it will create a new entry in the broadcast state?
这完全在您的控制之下。广播状态始终是地图状态;我想您会选择某种直接的键来使用,所以您会得到类似 MapState<String, Either<T1, T2>>
的东西。映射状态像任何哈希图一样工作:如果你重复使用一个键,它将替换条目,如果你引入一个新键,它将创建一个新条目。
... how can [I] provide a key common to these to [broadcast] streams?
密钥不必相同,只需是相同类型即可。