在两个流之间进行间隔连接时,延迟事件似乎没有被删除

late event seems not being dropped when doing interval join among two streams

我正在使用 Flink 1.11,我有以下测试用例来尝试基于事件时间的间隔连接。

两个流的数据定义如下:

object JoinStockInterval {
  //the stocks data,
  //ts is the implicit method that converts the time string to timestamp
  val stocks = Seq(
    Stock("id1", "2020-09-16 20:50:15".ts, 1),
    Stock("id1", "2020-09-16 20:50:12".ts, 2),
    Stock("id1", "2020-09-16 20:50:18".ts, 4),
    Stock("id1", "2020-09-16 20:50:11".ts, 3),
    Stock("id1", "2020-09-16 20:50:11".ts, 10),
    Stock("id1", "2020-09-16 20:50:13".ts, 5),
    Stock("id1", "2020-09-16 20:50:20".ts, 6),
    Stock("id1", "2020-09-16 20:50:14".ts, 7),
    Stock("id1", "2020-09-16 20:50:22".ts, 8),
    Stock("id1", "2020-09-16 20:50:40".ts, 9),
    Stock("id1", "2020-09-16 20:50:15".ts, 100)
  )

  //Mock that the stock name is changing over time
  val stockNameChangings = Seq(
    StockNameChanging("id1", "Stock1", "2020-09-16 20:50:16".ts),
    StockNameChanging("id1", "Stock101", "2020-09-16 20:50:20".ts),
    StockNameChanging("id1", "Stock4", "2020-09-16 20:50:17".ts),
    StockNameChanging("id1", "Stock7", "2020-09-16 20:50:21".ts),
    StockNameChanging("id1", "Stock5", "2020-09-16 20:50:17".ts),
    StockNameChanging("id1", "Stock501", "2020-09-16 20:50:22".ts),
    StockNameChanging("id1", "Stock6", "2020-09-16 20:50:23".ts)

  )

}

测试用例定义如下,每个允许延迟4秒:

 test("test interval join inner 2 works") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val ds1 = env.addSource(new IntervalJoinStockSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockWatermarkGenerator(4000)) //allow 4 secs lateness
    val ds2 = env.addSource(new IntervalJoinStockNameChangingSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockNameChangingWatermarkGenerator(4000)) //allow 4 secs lateness
    val tenv = StreamTableEnvironment.create(env)
    tenv.createTemporaryView("s1", ds1, $"id", $"price", $"trade_date".rowtime() as "rt1")
    tenv.createTemporaryView("s2", ds2, $"id", $"name", $"trade_date".rowtime() as "rt2")
    tenv.from("s1").printSchema()
    tenv.from("s2").printSchema()
    val sql =
      """
      select s1.id, s2.name, s1.price, cast (s1.rt1 as timestamp) as rt1, s2.rt2
      from s1 join s2
      on s1.id = s2.id
      where s1.rt1 between s2.rt2 - interval '2' second and s2.rt2 + interval '2' second

      """.stripMargin(' ')

    tenv.sqlQuery(sql).toAppendStream[Row].print()
    env.execute()
  }

加入结果如下:

id1,Stock1,1.0,2020-09-16T12:50:15,2020-09-16T12:50:16
id1,Stock1,4.0,2020-09-16T12:50:18,2020-09-16T12:50:16
id1,Stock101,4.0,2020-09-16T12:50:18,2020-09-16T12:50:20
id1,Stock4,4.0,2020-09-16T12:50:18,2020-09-16T12:50:17
id1,Stock4,1.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock5,4.0,2020-09-16T12:50:18,2020-09-16T12:50:17
id1,Stock5,1.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock101,6.0,2020-09-16T12:50:20,2020-09-16T12:50:20
id1,Stock7,6.0,2020-09-16T12:50:20,2020-09-16T12:50:21
id1,Stock501,6.0,2020-09-16T12:50:20,2020-09-16T12:50:22
id1,Stock1,7.0,2020-09-16T12:50:14,2020-09-16T12:50:16
id1,Stock101,8.0,2020-09-16T12:50:22,2020-09-16T12:50:20
id1,Stock501,8.0,2020-09-16T12:50:22,2020-09-16T12:50:22
id1,Stock7,8.0,2020-09-16T12:50:22,2020-09-16T12:50:21
id1,Stock6,8.0,2020-09-16T12:50:22,2020-09-16T12:50:23
id1,Stock1,100.0,2020-09-16T12:50:15,2020-09-16T12:50:16
id1,Stock4,100.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock5,100.0,2020-09-16T12:50:15,2020-09-16T12:50:17

奇怪的是上面结果中的最后一条记录,它来自股票流中的 Stock("id1", "2020-09-16 20:50:15".ts, 100),但这条记录在股票流中已经晚了。 在stocks流中看到如下两条记录,我在这个问题上卡了好几天了,请问为什么这条记录没有被丢弃,而是与另一个流(name changing stream)加入成功

    Stock("id1", "2020-09-16 20:50:40".ts, 9),
    Stock("id1", "2020-09-16 20:50:15".ts, 100)

水印策略正在使用AssignerWithPunctuatedWatermarks

你想知道的记录

Stock("id1", "2020-09-16 20:50:15".ts, 100)

从连接的角度来看并不晚。

原因与水印在运算符有多个输入的情况下如何传播有关(如此间隔连接)。连接运算符的当前水印始终是迄今为止从所有输入通道接收到的最小水印。

所以直到连接处理完这条记录

StockNameChanging("id1", "Stock501", "2020-09-16 20:50:22".ts)

连接处的水印由这条记录决定

StockNameChanging("id1", "Stock7", "2020-09-16 20:50:21".ts)

所以水印仍然在为连接定义的间隔内。

水印以这种方式工作,因为它们代表了一个断言,即流现在可以被认为是完整的,直到水印的时间戳。并且从连接的角度来看,它只知道最落后的流的水印。