如何分散作业以免 运行 内存不足
How-to spread the job so as to not run out of memory
我正在尝试 运行 一些 spark 作业,但通常执行程序 运行 内存不足:
17/02/06 19:12:02 WARN TaskSetManager: Lost task 10.0 in stage 476.3 (TID 133250, 10.0.0.10): ExecutorLostFailure (executor 12 exited caused by one of the running tasks) Reason: Container marked as failed: container_1486378087852_0006_01_000019 on host: 10.0.0.10. Exit status: 52. Diagnostics: Exception from container-launch.
Container id: container_1486378087852_0006_01_000019
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:933)
因为我已经设置了spark.executor.memory=20480m
,我觉得这个作业应该不需要更多的内存来工作,所以我看到的另一个选择是增加分区的数量。
我试过:
>>> sqlContext.setConf("spark.sql.shuffle.partitions", u"2001")
>>> sqlContext.getConf("spark.sql.shuffle.partitions")
u'2001'
和
>>> all_users.repartition(2001)
然而,当我开始工作时,我仍然看到默认的 200 个分区:
>>> all_users.repartition(2001).show()
[Stage 526:(0 + 30) / 200][Stage 527:>(0 + 0) / 126][Stage 528:>(0 + 0) / 128]0]
我在 Azure HDInsight 上使用 PySpark 2.0.2。谁能指出我做错了什么?
编辑
根据下面的答案我试过了:
sqlContext.setConf('spark.sql.shuffle.partitions', 2001)
一开始,但没有用。但是,这有效:
sqlContext.setConf('spark.sql.files.maxPartitionBytes', 100000000)
all_users 是一个 sql 数据框。一个具体的例子是:
all_users = sqlContext.table('RoamPositions')\
.withColumn('prev_district_id', F.lag('district_id', 1).over(user_window))\
.withColumn('prev_district_name', F.lag('district_name', 1).over(user_window))\
.filter('prev_district_id IS NOT NULL AND prev_district_id != district_id')\
.select('timetag', 'imsi', 'prev_district_id', 'prev_district_name', 'district_id', 'district_name')
根据您的评论,您似乎是从外部源读取数据并在调用 repartition
之前使用了 window 函数。 Window 函数:
- 如果没有提供
partitionBy
子句,则将数据重新分区到单个分区。
- 如果您提供
partitionBy
子句,则使用标准随机播放机制。
这里好像是后一种。由于 spark.sql.shuffle.partition
的默认值为 200,因此在重新分区之前,您的数据将被混洗到 200 个分区中。如果你一直想要 2001,你应该在加载数据之前设置它
sqlContext.setConf("spark.sql.shuffle.partitions", u"2001")
all_users = ...
另外spark.sql.shuffle.partitions
不影响初始分区的数量。这些可以使用其他属性来控制:How to increase partitions of the sql result from HiveContext in spark sql
我正在尝试 运行 一些 spark 作业,但通常执行程序 运行 内存不足:
17/02/06 19:12:02 WARN TaskSetManager: Lost task 10.0 in stage 476.3 (TID 133250, 10.0.0.10): ExecutorLostFailure (executor 12 exited caused by one of the running tasks) Reason: Container marked as failed: container_1486378087852_0006_01_000019 on host: 10.0.0.10. Exit status: 52. Diagnostics: Exception from container-launch.
Container id: container_1486378087852_0006_01_000019
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:933)
因为我已经设置了spark.executor.memory=20480m
,我觉得这个作业应该不需要更多的内存来工作,所以我看到的另一个选择是增加分区的数量。
我试过:
>>> sqlContext.setConf("spark.sql.shuffle.partitions", u"2001")
>>> sqlContext.getConf("spark.sql.shuffle.partitions")
u'2001'
和
>>> all_users.repartition(2001)
然而,当我开始工作时,我仍然看到默认的 200 个分区:
>>> all_users.repartition(2001).show()
[Stage 526:(0 + 30) / 200][Stage 527:>(0 + 0) / 126][Stage 528:>(0 + 0) / 128]0]
我在 Azure HDInsight 上使用 PySpark 2.0.2。谁能指出我做错了什么?
编辑
根据下面的答案我试过了:
sqlContext.setConf('spark.sql.shuffle.partitions', 2001)
一开始,但没有用。但是,这有效:
sqlContext.setConf('spark.sql.files.maxPartitionBytes', 100000000)
all_users 是一个 sql 数据框。一个具体的例子是:
all_users = sqlContext.table('RoamPositions')\
.withColumn('prev_district_id', F.lag('district_id', 1).over(user_window))\
.withColumn('prev_district_name', F.lag('district_name', 1).over(user_window))\
.filter('prev_district_id IS NOT NULL AND prev_district_id != district_id')\
.select('timetag', 'imsi', 'prev_district_id', 'prev_district_name', 'district_id', 'district_name')
根据您的评论,您似乎是从外部源读取数据并在调用 repartition
之前使用了 window 函数。 Window 函数:
- 如果没有提供
partitionBy
子句,则将数据重新分区到单个分区。 - 如果您提供
partitionBy
子句,则使用标准随机播放机制。
这里好像是后一种。由于 spark.sql.shuffle.partition
的默认值为 200,因此在重新分区之前,您的数据将被混洗到 200 个分区中。如果你一直想要 2001,你应该在加载数据之前设置它
sqlContext.setConf("spark.sql.shuffle.partitions", u"2001")
all_users = ...
另外spark.sql.shuffle.partitions
不影响初始分区的数量。这些可以使用其他属性来控制:How to increase partitions of the sql result from HiveContext in spark sql