Apache Flink 创建了错误的计划

Apache Flink creates incorrect plan

我为 Apache Flink 创建了一个简单的作业,它使用 Gelly 提供的 PageRank 实现。

在本地,运行 在 IDE 内,一切都很好。但是,我尝试使用 JobManager Web 界面将 JAR 和我的 Job 提交到我机器上的 Flink 实例 运行。但是,Flink 并没有为 Job 获取正确的计划并执行 PageRank,而是提出并执行了一个非常奇怪的计划,它只计算图的顶点数。

我做了一些研究和调试,发现 Gelly 提供的 PageRank 实现开始计算图形的顶点数,当它没有作为参数提供给算法时:

if (numberOfVertices == 0) {
    numberOfVertices = network.numberOfVertices();
}

此计算意味着嵌入式作业。由于操作员是惰性的,因此不会触发任何计算。在 Flink 服务器中,首先要做的是获取作业计划。这是由特殊环境 OptimizerPlanEnvironment 完成的,它提供了以下 result 方法:

public JobExecutionResult execute(String jobName) throws Exception {
    Plan plan = createProgramPlan(jobName);
    this.optimizerPlan = compiler.compile(plan);

    // do not go on with anything now!
    throw new ProgramAbortException();
}

问题来自这里。一旦 ProgramAbortException 被抛出,程序 returns 计划计算到此为止。但是只计算了内部作业计划,因此从不计算或执行主要作业计划。

这是我使用的代码:

public class Job {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Graph<Long, Double, Double> graph = Graph.fromDataSet(
            PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
        graph.run(new PageRank<Long>(0.85, 10)).print();
    }

    private static class VertexInit implements MapFunction<Long, Double> {
        @Override
        public Double map(Long value) throws Exception { return 1.0; }
    }
}

如果提供了顶点数,则执行例如graph.run(new PageRank<Long>(0.85, 5, 10)),没问题,plan计算正确,PageRank也计算出来了。

我的问题是:我做错了什么?或者这是 Flink 中的一些实际错误?

正如您所说,问题是 network.numberOfVertices 在顶点数据集上内部调用 count。这会触发一个独立的 Flink 作业来计算计数值。该值通常由 main 方法检索。但是,在 Web 客户端提交的情况下,这将不起作用,因为 OptimizerPlanEnvironment,它只允许编译单个 Flink 作业。该行为类似于也不支持急切计划执行的分离执行模式。

这是目前 Flink Web 客户端的一个限制。这种行为的原因是 Flink 不想阻塞等待 count 操作结果所必需的 Netty 通道处理程序线程。阻塞操作会耗尽线程池并使此会话的 Web 界面无响应,直到它被解除阻塞。