Apache Flink:逐步执行

Apache Flink: stepwise execution

由于性能测量,我想逐步执行为 Flink 编写的 Scala 程序,即

execute first operator; materialize result;
execute second operator; materialize result;
...

等等。原代码:

var filename = new String("<filename>")
var text = env.readTextFile(filename)
var counts = text.flatMap { _.toLowerCase.split("\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts.writeAsText("file://result.txt", WriteMode.OVERWRITE)
env.execute()

所以我希望 var counts = text.flatMap { _.toLowerCase.split("\W+") }.map { (_, 1) }.groupBy(0).sum(1) 的执行是逐步的。

在每个操作员之后调用 env.execute() 是正确的方法吗?

或者在每次操作后写入 /dev/null,即调用 counts.writeAsText("file:///home/username/dev/null", WriteMode.OVERWRITE) 然后调用 env.execute() 是更好的选择吗?为了这个目的,Flink 真的有类似 NullSink 的东西吗?

编辑: 我在集群上使用 Flink Scala Shell 并将应用程序设置为 parallelism=1 以执行上述代码。

Flink 默认使用流水线数据传输来提高作业执行的性能。但是,您也可以通过调用

强制批量传输数据
ExecutionEnvironment env = ...
env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);

这将分开两个运算符的执行(除非它们被链接)。您可以从日志文件中获取每个任务的执行时间或查看 Web 仪表板。请注意,这不适用于链式运算符,即具有相同并行性且不需要网络洗牌的运算符。此外,您应该意识到使用批量传输会增加程序的整体执行时间。我不认为在流水线数据处理器中真正分离运算符的执行时间是可能的。

在每个算子之后调用execute()是不行的,因为Flink还不支持将结果缓存到内存中。因此,如果执行运算符 2,则需要将运算符 1 的结果写入某个持久存储并再次读取,或者再次执行运算符 1。