处理时间 windows 不适用于 Apache Flink 中的有限数据源
Processing time windows doesn't work on finite data sources in Apache Flink
我正在尝试将一个非常简单的 window 函数应用于 Apache Flink 中的有限数据流(本地,无集群)。这是示例:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(ProcessingTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.sorted.toString())
}
})
.print()
env.execute()
在这里,我尝试在一秒钟内将所有到达 window 的元素分组,然后只打印这些组。
我假设所有元素都将在不到一秒的时间内生成并进入一个 window,因此 print()
中将有一个传入元素。但是,当我 运行 时 根本没有打印任何东西。
如果我删除所有 windowing 内容,例如
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.print()
我看到在 运行 之后打印的元素。我也用文件源试过这个,没有区别。
我机器上的默认并行度是 6。如果我像这样试验并行度和延迟级别
val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
.fromCollection(List("a", "b", "c", "d", "e"))
.map { x => Thread.sleep(1500); x }
我能够将一些(不是全部)元素分组,然后打印出来。
我的第一个假设是源完成速度比 1 秒快得多,并且任务在 window 的计时器触发之前关闭。调试显示the timer setting line in ProcessingTimeTrigger
is reached. Shouldn't all started timers finish before a task shuts down (at least this is the impression I got from the code)?
能否请您帮助我理解这一点并使其更具确定性?
更新 #1,2018 年 9 月 23 日:
我还尝试使用事件时间 windows 而不是处理时间 windows。如果我这样做:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] {
override def extractAscendingTimestamp(element: String): Long = {
element.charAt(0).toInt
}
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.trigger(EventTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.toString())
}
})
.print()
env.execute()
然后又没有打印任何内容。调试器显示触发器的 onElement
被每个元素调用,但 onEventTime
从未被调用。
此外,如果我修改时间戳提取器以进行更大的步骤:
element.charAt(0).toInt * 1000
打印所有元素(每组一个元素,这是预期的),除了最后一个。
更新 #2,2018 年 9 月 23 日:
更新 #1 在 中得到了回答。
当有限源到达末尾时,如果您正在使用事件时间,则将注入带有时间戳 Long.MAX_VALUE 的水印,这将导致所有事件时间计时器触发。但是,有了处理时间,Flink 会等待所有当前触发的定时器完成它们的动作,然后退出。
如您所料,您没有看到任何输出,因为源代码很快就完成了。
事件时间处理的确定性行为很简单;随着处理时间的增加,这并不是真正可以实现的。
但这里有一个或多或少有效的技巧:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val s = env.fromCollection(List("a", "b", "c", "d", "e"))
val t = env.addSource((context: SourceContext[String]) => {
while(true) {
Thread.sleep(100)
context.collect("dummy")
}
})
s.union(t)
.filter(_ != "dummy")
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.sorted.toString())
}
})
.print()
env.execute()
大卫的回答很直接!
我尝试过使用 ProcessTime 或 GlobalWindows 处理有限流的方法。所有人都面临着如何正确结束这项工作的问题(source stop,operator processed all data,sink done)。因为 processtime 和 count window 只会让 window/data 像大卫的回答一样未处理。一种方法是同步源和操作员之间的通信,然后通知退出。但它并不美丽。因此,只需简单地选择 EventTime window,即使在第一次开始停止源后,它也会处理所有数据。
我正在尝试将一个非常简单的 window 函数应用于 Apache Flink 中的有限数据流(本地,无集群)。这是示例:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(ProcessingTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.sorted.toString())
}
})
.print()
env.execute()
在这里,我尝试在一秒钟内将所有到达 window 的元素分组,然后只打印这些组。
我假设所有元素都将在不到一秒的时间内生成并进入一个 window,因此 print()
中将有一个传入元素。但是,当我 运行 时 根本没有打印任何东西。
如果我删除所有 windowing 内容,例如
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.print()
我看到在 运行 之后打印的元素。我也用文件源试过这个,没有区别。
我机器上的默认并行度是 6。如果我像这样试验并行度和延迟级别
val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
.fromCollection(List("a", "b", "c", "d", "e"))
.map { x => Thread.sleep(1500); x }
我能够将一些(不是全部)元素分组,然后打印出来。
我的第一个假设是源完成速度比 1 秒快得多,并且任务在 window 的计时器触发之前关闭。调试显示the timer setting line in ProcessingTimeTrigger
is reached. Shouldn't all started timers finish before a task shuts down (at least this is the impression I got from the code)?
能否请您帮助我理解这一点并使其更具确定性?
更新 #1,2018 年 9 月 23 日:
我还尝试使用事件时间 windows 而不是处理时间 windows。如果我这样做:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] {
override def extractAscendingTimestamp(element: String): Long = {
element.charAt(0).toInt
}
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.trigger(EventTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.toString())
}
})
.print()
env.execute()
然后又没有打印任何内容。调试器显示触发器的 onElement
被每个元素调用,但 onEventTime
从未被调用。
此外,如果我修改时间戳提取器以进行更大的步骤:
element.charAt(0).toInt * 1000
打印所有元素(每组一个元素,这是预期的),除了最后一个。
更新 #2,2018 年 9 月 23 日:
更新 #1 在
当有限源到达末尾时,如果您正在使用事件时间,则将注入带有时间戳 Long.MAX_VALUE 的水印,这将导致所有事件时间计时器触发。但是,有了处理时间,Flink 会等待所有当前触发的定时器完成它们的动作,然后退出。
如您所料,您没有看到任何输出,因为源代码很快就完成了。
事件时间处理的确定性行为很简单;随着处理时间的增加,这并不是真正可以实现的。
但这里有一个或多或少有效的技巧:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val s = env.fromCollection(List("a", "b", "c", "d", "e"))
val t = env.addSource((context: SourceContext[String]) => {
while(true) {
Thread.sleep(100)
context.collect("dummy")
}
})
s.union(t)
.filter(_ != "dummy")
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.sorted.toString())
}
})
.print()
env.execute()
大卫的回答很直接! 我尝试过使用 ProcessTime 或 GlobalWindows 处理有限流的方法。所有人都面临着如何正确结束这项工作的问题(source stop,operator processed all data,sink done)。因为 processtime 和 count window 只会让 window/data 像大卫的回答一样未处理。一种方法是同步源和操作员之间的通信,然后通知退出。但它并不美丽。因此,只需简单地选择 EventTime window,即使在第一次开始停止源后,它也会处理所有数据。