Flink:执行管道时出错 java.util.concurrent.TimeoutException:期货在 [10000 毫秒] 后超时
Flink: Error executing pipeline java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
我正在使用 Flink v1.4.0
。
我正在利用 Flink
的原生 Graph API (Gelly),我用它来处理 1200 万个数据点(边)。我目前正在 运行 通过 IntelliJ
使用 Flink
小型集群来完成我的作业,该小型集群在同一个 JVM 中执行所有 TaskManager 和 JobManager。
当我加载数据,有效地生成边缘时,就在我将 Flink
作业转为 运行 之前,我总是会遇到以下异常:
...
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#XXXXXXXXXX] with leader session id XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.
322062 [main] ERROR com.somecompany.some.domain.name.some.javaClass- Error executing pipeline
java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
at scala.concurrent.Await$$anonfun$ready.apply(package.scala:169)
at scala.concurrent.Await$$anonfun$ready.apply(package.scala:169)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
...
我确保通过 IntelliJ
编辑 运行 配置以添加:
-Dakka.client.timeout:600s
-Dakka.ask.timeout:600s
但异常仍然存在,我不知道是什么原因造成的。当我减少数据大小时,一切正常。
当我尝试通过 Flink UI
向 运行 提交相同的作业时,我在集群上安装了 Flink
的本地版本。在这种情况下,作业永远不会开始,我什至无法预览自动生成的运算符 DAG。当我减少要处理的数据量时,问题再次消失。我还更新了 flink-conf.yaml
以包含相同的 akka
配置属性,但这没有任何区别。
我该如何解决这个问题?
当 运行在 IntelliJ 中运行 Flink 作业时,依赖于 Flink 迷你集群。迷你集群不同于 运行ning Flink on Standalone、Yarn 或 Mesos,因为它依赖于单个 JVM。此外,mini-cluster 以多种方式进行了预配置,并且并非总是可以更改该配置(至少在某些设置方面)。
在将我的作业提交到集群时(不是在 运行通过迷你集群将它们连接时)我必须更改的一件事是我分配给作业管理器的堆内存的大小.这是必要的,因为加载要处理的数据不是我想要 运行 的 Flink 作业的一部分(这不是 Flink 的标准做法,这样做实际上是错误的)。通过增加 Job-Manager 的堆,我能够让我的工作达到 运行 但最终我不得不为我的 Flink Job 定义一个新的输入格式,以便让 Job-Manager 不必预加载数据执行——无论如何这不应该是它的责任。
对于手头的问题:无法通过 IntelliJ 完成(据我所知)向作业管理器分配堆内存,因此作业总是会失败。
我正在使用 Flink v1.4.0
。
我正在利用 Flink
的原生 Graph API (Gelly),我用它来处理 1200 万个数据点(边)。我目前正在 运行 通过 IntelliJ
使用 Flink
小型集群来完成我的作业,该小型集群在同一个 JVM 中执行所有 TaskManager 和 JobManager。
当我加载数据,有效地生成边缘时,就在我将 Flink
作业转为 运行 之前,我总是会遇到以下异常:
...
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#XXXXXXXXXX] with leader session id XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.
322062 [main] ERROR com.somecompany.some.domain.name.some.javaClass- Error executing pipeline
java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
at scala.concurrent.Await$$anonfun$ready.apply(package.scala:169)
at scala.concurrent.Await$$anonfun$ready.apply(package.scala:169)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
...
我确保通过 IntelliJ
编辑 运行 配置以添加:
-Dakka.client.timeout:600s
-Dakka.ask.timeout:600s
但异常仍然存在,我不知道是什么原因造成的。当我减少数据大小时,一切正常。
当我尝试通过 Flink UI
向 运行 提交相同的作业时,我在集群上安装了 Flink
的本地版本。在这种情况下,作业永远不会开始,我什至无法预览自动生成的运算符 DAG。当我减少要处理的数据量时,问题再次消失。我还更新了 flink-conf.yaml
以包含相同的 akka
配置属性,但这没有任何区别。
我该如何解决这个问题?
当 运行在 IntelliJ 中运行 Flink 作业时,依赖于 Flink 迷你集群。迷你集群不同于 运行ning Flink on Standalone、Yarn 或 Mesos,因为它依赖于单个 JVM。此外,mini-cluster 以多种方式进行了预配置,并且并非总是可以更改该配置(至少在某些设置方面)。
在将我的作业提交到集群时(不是在 运行通过迷你集群将它们连接时)我必须更改的一件事是我分配给作业管理器的堆内存的大小.这是必要的,因为加载要处理的数据不是我想要 运行 的 Flink 作业的一部分(这不是 Flink 的标准做法,这样做实际上是错误的)。通过增加 Job-Manager 的堆,我能够让我的工作达到 运行 但最终我不得不为我的 Flink Job 定义一个新的输入格式,以便让 Job-Manager 不必预加载数据执行——无论如何这不应该是它的责任。
对于手头的问题:无法通过 IntelliJ 完成(据我所知)向作业管理器分配堆内存,因此作业总是会失败。