如何使用connect在Flink中编写外连接函数?

How to write outer join function in Flink using connect?

我想对2个数据流做outer join,最好不要放在一个window中(我见过Cogroup总是带一个window ).

我试过这个:

    val controlStream = Flink.flinkEnv.fromElements(
      (1, "mex1", "stream1_feat1"),
      (1, "mex2", "stream1_feat2")
    ).keyBy(x => (x._1, x._2))

    val wordStream = Flink.flinkEnv.fromElements(
      (1, "mex1", "stream2_feat1"),
      (1, "mex3", "stream2_feat3")
    ).keyBy(x => (x._1, x._2))

    val filteredStream = controlStream
        .connect(wordStream)
        .flatMap(new ControlFunction)

////////////////////////////////////////////////////////////////////////

class ControlFunction extends RichCoFlatMapFunction[
    (Int, String, String),
    (Int, String, String),
    (Int, String, String, String)] {

    // outer join
    private var state1: ValueState[(Int, String, String)] = _
    private var state2: ValueState[(Int, String, String)] = _

    override def open(parameters: Configuration): Unit = {
      state1 = getRuntimeContext.getState(
        new ValueStateDescriptor[(Int, String, String)]("s1", createTypeInformation[(Int, String, String)]))

      state2 = getRuntimeContext.getState(
        new ValueStateDescriptor[(Int, String, String)]("s2", createTypeInformation[(Int, String, String)]))

    }

    override def flatMap1(value: (Int, String, String),
                          out: Collector[(Int, String, String, String)]): Unit = {

      val state2Value = state2.value

      if (state2Value != null) {
        println("inside map1 not null")
        state2.clear()
        out.collect((value._1, value._2, value._3, state2Value._3))
      } else {
        println("inside map1 null")
        state1.update(value)
        out.collect((value._1, value._2, value._3, "NA"))
      }
    }

    override def flatMap2(value: (Int, String, String),
                          out: Collector[(Int, String, String, String)]): Unit = {

      val state1Value = state1.value

      if (state1Value != null) {
        println("inside map2 not null")
        state1.clear()
        out.collect((value._1, value._2, state1Value._3, value._3))
      } else {
        println("inside map2 null")
        state2.update(value)
        out.collect((value._1, value._2, "NA", value._3))
      }
    }

  }

哪个给了我:

5> (1,mex2,stream1_feat2,NA)
8> (1,mex1,stream1_feat1,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)

不应生成记录 (1,mex1,stream1_feat1,NA)。 我想要实现的结果是外连接:

5> (1,mex2,stream1_feat2,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)

通过打印语句,发现2个flapMaps是依次传递的,导致mex1产生了两次,如何解决?

提前致谢!

您不能期望流式外部联接的行为方式与批处理外部联接相同。批量外连接可以完全扫描两个输入表,并且只有在匹配记录不存在时才会产生包含空值的输出行。使用流式实现,您无法知道等待是否最终会收到匹配的记录。

由于无法访问未来,流处理应用程序通常被迫生成一个流作为输出,其中包含随着更多信息可用而更新的结果。

您可以做的一件事是等待一段时间,看看发出包含 NA 的结果是否是错误的,但最终您必须停止等待并产生结果。

请注意,Flink 的 Table API 有一个外部连接,但您会注意到它被标记为 "Result Updating" 出于上述原因。