后期数据丢失的 Flink 端输出

Flink side output for late data missing

这是我的申请代码

object StreamingJob {
    def main(args: Array[String]) {
        // set up the streaming execution environment
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        // define EventTime and Watermark
        var sensorData: DataStream[SensorReading] = env.addSource(new SensorSource).assignTimestampsAndWatermarks(
            WatermarkStrategy
                .forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(0))
                .withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
                    override def extractTimestamp(t: SensorReading, l: Long): Long = t.timestamp
                })
        )

        val outputTag = OutputTag[SensorReading]("late-event")

        val minTemp: DataStream[String] = sensorData
            .map(r => {
                val celsius = (r.temperature - 32) * (5.0 / 9.0)
                SensorReading(r.id, r.timestamp, celsius)
            })
            .keyBy(_.id)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .allowedLateness(Time.seconds(10))
            .sideOutputLateData(outputTag)
            // compute min temperature
            .process(new TemperatureMiner)

        val lateStream: DataStream[SensorReading] = minTemp.getSideOutput(outputTag)
        lateStream.map(r => s"late event: ${r.id}, ${r.timestamp}, ${r.temperature}").print()

        minTemp.print()

        // execute program
        env.execute("Flink Streaming Scala API Skeleton")
    }
}

我非常确定捕获了延迟数据,因为我可以看到日志打印 TemperatureMiner 被一个 window 多次调用,因此触发延迟。

但问题是,对于延迟数据,lateStream 没有从侧面输出打印任何内容。知道为什么吗?

window 中延迟数据的侧输出仅是发送的数据太晚以至于超出了允许的延迟范围。也许 none 的延迟数据已经足够延迟了。