后期数据丢失的 Flink 端输出
Flink side output for late data missing
这是我的申请代码
object StreamingJob {
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// define EventTime and Watermark
var sensorData: DataStream[SensorReading] = env.addSource(new SensorSource).assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
override def extractTimestamp(t: SensorReading, l: Long): Long = t.timestamp
})
)
val outputTag = OutputTag[SensorReading]("late-event")
val minTemp: DataStream[String] = sensorData
.map(r => {
val celsius = (r.temperature - 32) * (5.0 / 9.0)
SensorReading(r.id, r.timestamp, celsius)
})
.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(10))
.sideOutputLateData(outputTag)
// compute min temperature
.process(new TemperatureMiner)
val lateStream: DataStream[SensorReading] = minTemp.getSideOutput(outputTag)
lateStream.map(r => s"late event: ${r.id}, ${r.timestamp}, ${r.temperature}").print()
minTemp.print()
// execute program
env.execute("Flink Streaming Scala API Skeleton")
}
}
我非常确定捕获了延迟数据,因为我可以看到日志打印 TemperatureMiner
被一个 window 多次调用,因此触发延迟。
但问题是,对于延迟数据,lateStream 没有从侧面输出打印任何内容。知道为什么吗?
window 中延迟数据的侧输出仅是发送的数据太晚以至于超出了允许的延迟范围。也许 none 的延迟数据已经足够延迟了。
这是我的申请代码
object StreamingJob {
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// define EventTime and Watermark
var sensorData: DataStream[SensorReading] = env.addSource(new SensorSource).assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
override def extractTimestamp(t: SensorReading, l: Long): Long = t.timestamp
})
)
val outputTag = OutputTag[SensorReading]("late-event")
val minTemp: DataStream[String] = sensorData
.map(r => {
val celsius = (r.temperature - 32) * (5.0 / 9.0)
SensorReading(r.id, r.timestamp, celsius)
})
.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(10))
.sideOutputLateData(outputTag)
// compute min temperature
.process(new TemperatureMiner)
val lateStream: DataStream[SensorReading] = minTemp.getSideOutput(outputTag)
lateStream.map(r => s"late event: ${r.id}, ${r.timestamp}, ${r.temperature}").print()
minTemp.print()
// execute program
env.execute("Flink Streaming Scala API Skeleton")
}
}
我非常确定捕获了延迟数据,因为我可以看到日志打印 TemperatureMiner
被一个 window 多次调用,因此触发延迟。
但问题是,对于延迟数据,lateStream 没有从侧面输出打印任何内容。知道为什么吗?
window 中延迟数据的侧输出仅是发送的数据太晚以至于超出了允许的延迟范围。也许 none 的延迟数据已经足够延迟了。