如何使用connect在Flink中编写外连接函数?
How to write outer join function in Flink using connect?
我想对2个数据流做outer join,最好不要放在一个window中(我见过Cogroup
总是带一个window ).
我试过这个:
val controlStream = Flink.flinkEnv.fromElements(
(1, "mex1", "stream1_feat1"),
(1, "mex2", "stream1_feat2")
).keyBy(x => (x._1, x._2))
val wordStream = Flink.flinkEnv.fromElements(
(1, "mex1", "stream2_feat1"),
(1, "mex3", "stream2_feat3")
).keyBy(x => (x._1, x._2))
val filteredStream = controlStream
.connect(wordStream)
.flatMap(new ControlFunction)
////////////////////////////////////////////////////////////////////////
class ControlFunction extends RichCoFlatMapFunction[
(Int, String, String),
(Int, String, String),
(Int, String, String, String)] {
// outer join
private var state1: ValueState[(Int, String, String)] = _
private var state2: ValueState[(Int, String, String)] = _
override def open(parameters: Configuration): Unit = {
state1 = getRuntimeContext.getState(
new ValueStateDescriptor[(Int, String, String)]("s1", createTypeInformation[(Int, String, String)]))
state2 = getRuntimeContext.getState(
new ValueStateDescriptor[(Int, String, String)]("s2", createTypeInformation[(Int, String, String)]))
}
override def flatMap1(value: (Int, String, String),
out: Collector[(Int, String, String, String)]): Unit = {
val state2Value = state2.value
if (state2Value != null) {
println("inside map1 not null")
state2.clear()
out.collect((value._1, value._2, value._3, state2Value._3))
} else {
println("inside map1 null")
state1.update(value)
out.collect((value._1, value._2, value._3, "NA"))
}
}
override def flatMap2(value: (Int, String, String),
out: Collector[(Int, String, String, String)]): Unit = {
val state1Value = state1.value
if (state1Value != null) {
println("inside map2 not null")
state1.clear()
out.collect((value._1, value._2, state1Value._3, value._3))
} else {
println("inside map2 null")
state2.update(value)
out.collect((value._1, value._2, "NA", value._3))
}
}
}
哪个给了我:
5> (1,mex2,stream1_feat2,NA)
8> (1,mex1,stream1_feat1,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)
不应生成记录 (1,mex1,stream1_feat1,NA)
。
我想要实现的结果是外连接:
5> (1,mex2,stream1_feat2,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)
通过打印语句,发现2个flapMaps
是依次传递的,导致mex1
产生了两次,如何解决?
提前致谢!
您不能期望流式外部联接的行为方式与批处理外部联接相同。批量外连接可以完全扫描两个输入表,并且只有在匹配记录不存在时才会产生包含空值的输出行。使用流式实现,您无法知道等待是否最终会收到匹配的记录。
由于无法访问未来,流处理应用程序通常被迫生成一个流作为输出,其中包含随着更多信息可用而更新的结果。
您可以做的一件事是等待一段时间,看看发出包含 NA 的结果是否是错误的,但最终您必须停止等待并产生结果。
请注意,Flink 的 Table API 有一个外部连接,但您会注意到它被标记为 "Result Updating" 出于上述原因。
我想对2个数据流做outer join,最好不要放在一个window中(我见过Cogroup
总是带一个window ).
我试过这个:
val controlStream = Flink.flinkEnv.fromElements(
(1, "mex1", "stream1_feat1"),
(1, "mex2", "stream1_feat2")
).keyBy(x => (x._1, x._2))
val wordStream = Flink.flinkEnv.fromElements(
(1, "mex1", "stream2_feat1"),
(1, "mex3", "stream2_feat3")
).keyBy(x => (x._1, x._2))
val filteredStream = controlStream
.connect(wordStream)
.flatMap(new ControlFunction)
////////////////////////////////////////////////////////////////////////
class ControlFunction extends RichCoFlatMapFunction[
(Int, String, String),
(Int, String, String),
(Int, String, String, String)] {
// outer join
private var state1: ValueState[(Int, String, String)] = _
private var state2: ValueState[(Int, String, String)] = _
override def open(parameters: Configuration): Unit = {
state1 = getRuntimeContext.getState(
new ValueStateDescriptor[(Int, String, String)]("s1", createTypeInformation[(Int, String, String)]))
state2 = getRuntimeContext.getState(
new ValueStateDescriptor[(Int, String, String)]("s2", createTypeInformation[(Int, String, String)]))
}
override def flatMap1(value: (Int, String, String),
out: Collector[(Int, String, String, String)]): Unit = {
val state2Value = state2.value
if (state2Value != null) {
println("inside map1 not null")
state2.clear()
out.collect((value._1, value._2, value._3, state2Value._3))
} else {
println("inside map1 null")
state1.update(value)
out.collect((value._1, value._2, value._3, "NA"))
}
}
override def flatMap2(value: (Int, String, String),
out: Collector[(Int, String, String, String)]): Unit = {
val state1Value = state1.value
if (state1Value != null) {
println("inside map2 not null")
state1.clear()
out.collect((value._1, value._2, state1Value._3, value._3))
} else {
println("inside map2 null")
state2.update(value)
out.collect((value._1, value._2, "NA", value._3))
}
}
}
哪个给了我:
5> (1,mex2,stream1_feat2,NA)
8> (1,mex1,stream1_feat1,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)
不应生成记录 (1,mex1,stream1_feat1,NA)
。
我想要实现的结果是外连接:
5> (1,mex2,stream1_feat2,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)
通过打印语句,发现2个flapMaps
是依次传递的,导致mex1
产生了两次,如何解决?
提前致谢!
您不能期望流式外部联接的行为方式与批处理外部联接相同。批量外连接可以完全扫描两个输入表,并且只有在匹配记录不存在时才会产生包含空值的输出行。使用流式实现,您无法知道等待是否最终会收到匹配的记录。
由于无法访问未来,流处理应用程序通常被迫生成一个流作为输出,其中包含随着更多信息可用而更新的结果。
您可以做的一件事是等待一段时间,看看发出包含 NA 的结果是否是错误的,但最终您必须停止等待并产生结果。
请注意,Flink 的 Table API 有一个外部连接,但您会注意到它被标记为 "Result Updating" 出于上述原因。