flink 作业没有跨机器分布
flink job is not distributed across machines
我在 Apache flink 中有一个小用例,它是一个批处理系统。我需要处理一组文件。每个文件的处理必须由一台机器处理。我有下面的代码。始终只占用一个任务槽,一个接一个地处理文件。我有 6 个节点(所以有 6 个任务管理器)并在每个节点中配置了 4 个任务槽。所以,我希望一次处理 24 个文件。
class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
override def mapPartition(
myfiles: java.lang.Iterable[java.io.File],
out:org.apache.flink.util.Collector[Int])
: Unit = {
var temp = myfiles.iterator()
while(temp.hasNext()){
val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
val file = new File(temp.next().toURI)
Process(
"/bin/bash ./run.sh " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
new File(fp1.getAbsoluteFile.getParent))
.lines
.foreach{println}
out.collect(1)
}
}
}
我以 ./bin/start-cluster.sh 命令启动了 flink,Web 用户界面显示它有 6 个任务管理器,24 个任务槽。
这些文件夹包含大约 49 个文件。当我在此集合上创建 mapPartition 时,我预计会跨越 49 个并行进程。但是,在我的基础架构中,它们都是一个接一个地处理的。这意味着只有一台机器(一个任务管理器)处理所有 49 个文件名。我想要的是,每个插槽配置 2 个任务,我希望同时处理 24 个文件。
任何指点肯定会对这里有所帮助。我在 flink-conf.yaml file
中有这些参数
jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24
提前致谢。有人可以告诉我哪里出错了吗?
正如 David 所描述的那样,问题是 env.fromCollection(Iterable[T])
创建了一个 DataSource
和一个非平行的 InputFormat
。因此,DataSource
以 1
的并行度执行。随后的运算符 (mapPartition
) 从源代码继承了这种并行性,因此它们可以被链接起来(这为我们节省了一次网络洗牌)。
解决此问题的方法是通过
显式重新平衡源 DataSet
env.fromCollection(folders).rebalance()
或在后续运算符 (mapPartition
) 中显式设置所需的并行度:
env.fromCollection(folders).mapPartition(...).setParallelism(49)
我在 Apache flink 中有一个小用例,它是一个批处理系统。我需要处理一组文件。每个文件的处理必须由一台机器处理。我有下面的代码。始终只占用一个任务槽,一个接一个地处理文件。我有 6 个节点(所以有 6 个任务管理器)并在每个节点中配置了 4 个任务槽。所以,我希望一次处理 24 个文件。
class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
override def mapPartition(
myfiles: java.lang.Iterable[java.io.File],
out:org.apache.flink.util.Collector[Int])
: Unit = {
var temp = myfiles.iterator()
while(temp.hasNext()){
val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
val file = new File(temp.next().toURI)
Process(
"/bin/bash ./run.sh " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
new File(fp1.getAbsoluteFile.getParent))
.lines
.foreach{println}
out.collect(1)
}
}
}
我以 ./bin/start-cluster.sh 命令启动了 flink,Web 用户界面显示它有 6 个任务管理器,24 个任务槽。
这些文件夹包含大约 49 个文件。当我在此集合上创建 mapPartition 时,我预计会跨越 49 个并行进程。但是,在我的基础架构中,它们都是一个接一个地处理的。这意味着只有一台机器(一个任务管理器)处理所有 49 个文件名。我想要的是,每个插槽配置 2 个任务,我希望同时处理 24 个文件。
任何指点肯定会对这里有所帮助。我在 flink-conf.yaml file
中有这些参数jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24
提前致谢。有人可以告诉我哪里出错了吗?
正如 David 所描述的那样,问题是 env.fromCollection(Iterable[T])
创建了一个 DataSource
和一个非平行的 InputFormat
。因此,DataSource
以 1
的并行度执行。随后的运算符 (mapPartition
) 从源代码继承了这种并行性,因此它们可以被链接起来(这为我们节省了一次网络洗牌)。
解决此问题的方法是通过
显式重新平衡源DataSet
env.fromCollection(folders).rebalance()
或在后续运算符 (mapPartition
) 中显式设置所需的并行度:
env.fromCollection(folders).mapPartition(...).setParallelism(49)