Flink DataSet 程序运行多个作业
Flink DataSet program runs several jobs
我在 Apache Flink 中有以下代码。
当我执行它时,我的代码的某些部分是 运行 两次。
谁能告诉我为什么会这样?
DataSet input1 = ...
DataSet input2 = ...
List mappedInput1 = input1
.map(...)
.collect();
DataSet data = input1
.union(input1.filter(...))
.mapPartition(...);
data = data.union(data2).distinct();
data.flatMap(new MapFunc1(data.collect()));
data
.flatMap(new MapFunc2(input2.collect()))
.groupBy(0)
.sum(1)
.print();
每个 collect()
和 print()
语句急切地触发执行并将结果提取到客户端代码。每个这样的调用都会将整个程序追溯到数据源。
您的代码包含三个 collect()
和一个 print()
语句。因此,提交并执行了四个单独的程序。你应该看看 broadcast variables,而不是使用 collect()
。广播变量将一个数据集分发给一个运算符的每个并行实例。计算和分发发生在同一个程序中,而不是通过客户端程序进行路由。相反,数据直接在工人 运行 和操作员之间交换。
我在 Apache Flink 中有以下代码。 当我执行它时,我的代码的某些部分是 运行 两次。 谁能告诉我为什么会这样?
DataSet input1 = ...
DataSet input2 = ...
List mappedInput1 = input1
.map(...)
.collect();
DataSet data = input1
.union(input1.filter(...))
.mapPartition(...);
data = data.union(data2).distinct();
data.flatMap(new MapFunc1(data.collect()));
data
.flatMap(new MapFunc2(input2.collect()))
.groupBy(0)
.sum(1)
.print();
每个 collect()
和 print()
语句急切地触发执行并将结果提取到客户端代码。每个这样的调用都会将整个程序追溯到数据源。
您的代码包含三个 collect()
和一个 print()
语句。因此,提交并执行了四个单独的程序。你应该看看 broadcast variables,而不是使用 collect()
。广播变量将一个数据集分发给一个运算符的每个并行实例。计算和分发发生在同一个程序中,而不是通过客户端程序进行路由。相反,数据直接在工人 运行 和操作员之间交换。