Apache Spark 在工作开始前做什么
What is Apache Spark doing before a job start
我在 AWS EMR 上有一个 Apache Spark 批处理作业 运行。它从 AWS S3 中提取,使用该数据运行几个作业,然后将数据存储在 RDS 实例中。
但是,工作之间似乎有很长一段时间activity。
这是CPU的用途:
这是网络:
注意每列之间的间隙,它几乎与 activity 列的大小相同!
起初我以为这两列被移动了(当它从 S3 中拉出时,它没有使用很多 CPU 反之亦然)但后来我注意到这两个图表实际上遵循彼此。这是有道理的,因为 RDD 是惰性的,因此会拉 因为工作是 运行.
这引出了我的问题,那段时间 Spark 在做什么?在那段时间里,所有的 Ganglia 图表似乎都归零了。就好像集群决定在每个作业之前休息一下。
谢谢。
编辑:查看日志,这部分似乎需要一个小时...什么都不做?
15/04/27 01:13:13 INFO storage.DiskBlockManager: Created local directory at /mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1429892010439_0020/spark-c570e510-934c-4510-a1e5-aa85d407b748
15/04/27 01:13:13 INFO storage.MemoryStore: MemoryStore started with capacity 4.9 GB
15/04/27 01:13:13 INFO netty.NettyBlockTransferService: Server created on 37151
15/04/27 01:13:13 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/04/27 01:13:13 INFO storage.BlockManagerMaster: Registered BlockManager
15/04/27 01:13:13 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@ip-10-0-3-12.ec2.internal:41461/user/HeartbeatReceiver
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 7
15/04/27 02:30:45 INFO executor.Executor: Running task 77251.0 in stage 0.0 (TID 0)
15/04/27 02:30:45 INFO executor.Executor: Running task 77258.0 in stage 0.0 (TID 7)
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 8
15/04/27 02:30:45 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 8)
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 15
15/04/27 02:30:45 INFO executor.Executor: Running task 7.0 in stage 0.0 (TID 15)
15/04/27 02:30:45 INFO broadcast.TorrentBroadcast: Started reading broadcast variable
注意 01:13:13
,它一直挂在那里直到 20:30:45
。
我发现了问题。问题在于我调用从 S3 拉取的方式。
我们在 S3 中的数据由 s3n://bucket/2015/01/03/10/40/actualData.txt
中的日期模式分隔,数据来自 2015 年 1 月 3 日 10:40
所以当我们想运行对整个集合进行批处理时,我们调用sc.textFiles("s3n://bucket/*/*/*/*/*/*")
。
但这很糟糕。回想起来,这是有道理的;对于每个星号 (*),Spark 需要获取 "directory" 中的所有文件,然后获取其下目录中的所有文件。一个月大约有 30 个文件,每天有 24 个文件,每个文件有 60 个。所以上面的模式会在每个星上调用 "list files" 并且返回文件上的调用列表文件,一直向下到分钟!这是为了最终可以获取所有的 **/acutalData.txt 文件,然后合并它们的所有 RDD。
当然,这真的很慢。所以答案是在代码中构建这些路径(所有日期的字符串列表。在我们的例子中,可以确定所有可能的日期)并将它们减少为逗号分隔的字符串,可以传递给 textFiles
.
如果在您的情况下您无法确定所有可能的路径,请考虑重组数据或构建尽可能多的路径并仅在路径末尾调用 *
,或者使用 AmazonS3Client 使用列表对象 api 获取所有密钥(这允许您非常快速地获取带有前缀的存储桶中的所有密钥),然后将它们作为逗号分隔的字符串传递到 textFiles
.它仍然会为每个文件调用 list Status
并且它仍然是串行的,但是调用会少很多。
然而,所有这些解决方案只会减缓不可避免的事情;随着越来越多的数据被构建,越来越多的列表状态调用将被串行调用。问题的根源似乎是 sc.textFiles(s3n://)
假装 s3 是一个文件系统,但实际上不是。它是一个键值存储。 Spark(和 Hadoop)需要一种不同的方式来处理不假定文件系统的 S3(以及可能的其他键值存储)。
我在 AWS EMR 上有一个 Apache Spark 批处理作业 运行。它从 AWS S3 中提取,使用该数据运行几个作业,然后将数据存储在 RDS 实例中。
但是,工作之间似乎有很长一段时间activity。
这是CPU的用途:
这是网络:
注意每列之间的间隙,它几乎与 activity 列的大小相同!
起初我以为这两列被移动了(当它从 S3 中拉出时,它没有使用很多 CPU 反之亦然)但后来我注意到这两个图表实际上遵循彼此。这是有道理的,因为 RDD 是惰性的,因此会拉 因为工作是 运行.
这引出了我的问题,那段时间 Spark 在做什么?在那段时间里,所有的 Ganglia 图表似乎都归零了。就好像集群决定在每个作业之前休息一下。
谢谢。
编辑:查看日志,这部分似乎需要一个小时...什么都不做?
15/04/27 01:13:13 INFO storage.DiskBlockManager: Created local directory at /mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1429892010439_0020/spark-c570e510-934c-4510-a1e5-aa85d407b748
15/04/27 01:13:13 INFO storage.MemoryStore: MemoryStore started with capacity 4.9 GB
15/04/27 01:13:13 INFO netty.NettyBlockTransferService: Server created on 37151
15/04/27 01:13:13 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/04/27 01:13:13 INFO storage.BlockManagerMaster: Registered BlockManager
15/04/27 01:13:13 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@ip-10-0-3-12.ec2.internal:41461/user/HeartbeatReceiver
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 7
15/04/27 02:30:45 INFO executor.Executor: Running task 77251.0 in stage 0.0 (TID 0)
15/04/27 02:30:45 INFO executor.Executor: Running task 77258.0 in stage 0.0 (TID 7)
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 8
15/04/27 02:30:45 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 8)
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 15
15/04/27 02:30:45 INFO executor.Executor: Running task 7.0 in stage 0.0 (TID 15)
15/04/27 02:30:45 INFO broadcast.TorrentBroadcast: Started reading broadcast variable
注意 01:13:13
,它一直挂在那里直到 20:30:45
。
我发现了问题。问题在于我调用从 S3 拉取的方式。
我们在 S3 中的数据由 s3n://bucket/2015/01/03/10/40/actualData.txt
中的日期模式分隔,数据来自 2015 年 1 月 3 日 10:40
所以当我们想运行对整个集合进行批处理时,我们调用sc.textFiles("s3n://bucket/*/*/*/*/*/*")
。
但这很糟糕。回想起来,这是有道理的;对于每个星号 (*),Spark 需要获取 "directory" 中的所有文件,然后获取其下目录中的所有文件。一个月大约有 30 个文件,每天有 24 个文件,每个文件有 60 个。所以上面的模式会在每个星上调用 "list files" 并且返回文件上的调用列表文件,一直向下到分钟!这是为了最终可以获取所有的 **/acutalData.txt 文件,然后合并它们的所有 RDD。
当然,这真的很慢。所以答案是在代码中构建这些路径(所有日期的字符串列表。在我们的例子中,可以确定所有可能的日期)并将它们减少为逗号分隔的字符串,可以传递给 textFiles
.
如果在您的情况下您无法确定所有可能的路径,请考虑重组数据或构建尽可能多的路径并仅在路径末尾调用 *
,或者使用 AmazonS3Client 使用列表对象 api 获取所有密钥(这允许您非常快速地获取带有前缀的存储桶中的所有密钥),然后将它们作为逗号分隔的字符串传递到 textFiles
.它仍然会为每个文件调用 list Status
并且它仍然是串行的,但是调用会少很多。
然而,所有这些解决方案只会减缓不可避免的事情;随着越来越多的数据被构建,越来越多的列表状态调用将被串行调用。问题的根源似乎是 sc.textFiles(s3n://)
假装 s3 是一个文件系统,但实际上不是。它是一个键值存储。 Spark(和 Hadoop)需要一种不同的方式来处理不假定文件系统的 S3(以及可能的其他键值存储)。