Apache Flink 创建了错误的计划
Apache Flink creates incorrect plan
我为 Apache Flink 创建了一个简单的作业,它使用 Gelly 提供的 PageRank 实现。
在本地,运行 在 IDE 内,一切都很好。但是,我尝试使用 JobManager Web 界面将 JAR 和我的 Job 提交到我机器上的 Flink 实例 运行。但是,Flink 并没有为 Job 获取正确的计划并执行 PageRank,而是提出并执行了一个非常奇怪的计划,它只计算图的顶点数。
我做了一些研究和调试,发现 Gelly 提供的 PageRank 实现开始计算图形的顶点数,当它没有作为参数提供给算法时:
if (numberOfVertices == 0) {
numberOfVertices = network.numberOfVertices();
}
此计算意味着嵌入式作业。由于操作员是惰性的,因此不会触发任何计算。在 Flink 服务器中,首先要做的是获取作业计划。这是由特殊环境 OptimizerPlanEnvironment
完成的,它提供了以下 result
方法:
public JobExecutionResult execute(String jobName) throws Exception {
Plan plan = createProgramPlan(jobName);
this.optimizerPlan = compiler.compile(plan);
// do not go on with anything now!
throw new ProgramAbortException();
}
问题来自这里。一旦 ProgramAbortException
被抛出,程序 returns 计划计算到此为止。但是只计算了内部作业计划,因此从不计算或执行主要作业计划。
这是我使用的代码:
public class Job {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Double, Double> graph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
graph.run(new PageRank<Long>(0.85, 10)).print();
}
private static class VertexInit implements MapFunction<Long, Double> {
@Override
public Double map(Long value) throws Exception { return 1.0; }
}
}
如果提供了顶点数,则执行例如graph.run(new PageRank<Long>(0.85, 5, 10))
,没问题,plan计算正确,PageRank也计算出来了。
我的问题是:我做错了什么?或者这是 Flink 中的一些实际错误?
正如您所说,问题是 network.numberOfVertices
在顶点数据集上内部调用 count
。这会触发一个独立的 Flink 作业来计算计数值。该值通常由 main
方法检索。但是,在 Web 客户端提交的情况下,这将不起作用,因为 OptimizerPlanEnvironment
,它只允许编译单个 Flink 作业。该行为类似于也不支持急切计划执行的分离执行模式。
这是目前 Flink Web 客户端的一个限制。这种行为的原因是 Flink 不想阻塞等待 count
操作结果所必需的 Netty 通道处理程序线程。阻塞操作会耗尽线程池并使此会话的 Web 界面无响应,直到它被解除阻塞。
我为 Apache Flink 创建了一个简单的作业,它使用 Gelly 提供的 PageRank 实现。
在本地,运行 在 IDE 内,一切都很好。但是,我尝试使用 JobManager Web 界面将 JAR 和我的 Job 提交到我机器上的 Flink 实例 运行。但是,Flink 并没有为 Job 获取正确的计划并执行 PageRank,而是提出并执行了一个非常奇怪的计划,它只计算图的顶点数。
我做了一些研究和调试,发现 Gelly 提供的 PageRank 实现开始计算图形的顶点数,当它没有作为参数提供给算法时:
if (numberOfVertices == 0) {
numberOfVertices = network.numberOfVertices();
}
此计算意味着嵌入式作业。由于操作员是惰性的,因此不会触发任何计算。在 Flink 服务器中,首先要做的是获取作业计划。这是由特殊环境 OptimizerPlanEnvironment
完成的,它提供了以下 result
方法:
public JobExecutionResult execute(String jobName) throws Exception {
Plan plan = createProgramPlan(jobName);
this.optimizerPlan = compiler.compile(plan);
// do not go on with anything now!
throw new ProgramAbortException();
}
问题来自这里。一旦 ProgramAbortException
被抛出,程序 returns 计划计算到此为止。但是只计算了内部作业计划,因此从不计算或执行主要作业计划。
这是我使用的代码:
public class Job {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Double, Double> graph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
graph.run(new PageRank<Long>(0.85, 10)).print();
}
private static class VertexInit implements MapFunction<Long, Double> {
@Override
public Double map(Long value) throws Exception { return 1.0; }
}
}
如果提供了顶点数,则执行例如graph.run(new PageRank<Long>(0.85, 5, 10))
,没问题,plan计算正确,PageRank也计算出来了。
我的问题是:我做错了什么?或者这是 Flink 中的一些实际错误?
正如您所说,问题是 network.numberOfVertices
在顶点数据集上内部调用 count
。这会触发一个独立的 Flink 作业来计算计数值。该值通常由 main
方法检索。但是,在 Web 客户端提交的情况下,这将不起作用,因为 OptimizerPlanEnvironment
,它只允许编译单个 Flink 作业。该行为类似于也不支持急切计划执行的分离执行模式。
这是目前 Flink Web 客户端的一个限制。这种行为的原因是 Flink 不想阻塞等待 count
操作结果所必需的 Netty 通道处理程序线程。阻塞操作会耗尽线程池并使此会话的 Web 界面无响应,直到它被解除阻塞。