Pyspark 简单重新分区和 toPandas() 仅在 600,000 多行上无法完成
Pyspark simple re-partition and toPandas() fails to finish on just 600,000+ rows
我有 JSON 数据,我正在将这些数据读入具有多个字段的数据框中,根据两列对其进行重新分区,然后转换为 Pandas。
此作业在 EMR 上仅在 600,000 行数据上一直失败,并出现一些不明显的错误。我还增加了 spark 驱动程序的内存设置,但仍然没有看到任何解决方案。
这是我的 pyspark 代码:
enhDataDf = (
sqlContext
.read.json(sys.argv[1])
)
enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)
enhDataDf = sqlContext.createDataFrame(enhDataDf)
enhDataDf = (
enhDataDf
.toJSON()
.saveAsTextFile(sys.argv[2])
)
我的spark设置如下:
conf = SparkConf().setAppName('myapp1')
conf.set('spark.yarn.executor.memoryOverhead', 8192)
conf.set('spark.executor.memory', 8192)
conf.set('spark.driver.memory', 8192)
sc = SparkContext(conf=conf)
我得到的错误是:
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:11 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:11 ERROR ApplicationMaster: User application exited with status 143
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:56 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:56 ERROR ApplicationMaster: User application exited with status 143
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
代码在大约 600,000 JSON 行上运行良好 - 即使有大量可用内存。然后,一直失败。
对正在发生的事情以及如何调试/修复这个问题有什么想法吗?
这让我想起了我的 Spark – Container exited with a non-zero exit code 143,当时我在 集群 模式下启动 PySpark 作业,这表明您的应用程序存在内存问题。
首先尝试确定哪些机器出现故障,驱动程序或执行程序,从而能够更好地定位您的动作 - 根据我的阅读,它应该是执行程序。
我看到你已经设置了memoryOverhead
配置,很好。现在让我们关注 memory
配置:
...running Python with Spark (PySPark), so all the code of mine runs off the heap. For that reason, I have to allocate “not much” memory (since this will cut the memory I am allowed to use from the total memory; i.e. that if the total memory I am allowed to use is 20G and I am requesting 12G, then 8G will be left for my Python application to use.
所以尝试减少那个属性,是的,减少它!
下一个目标:#cores!
也减少它,例如,如果您使用 8,则在执行程序中使用 4 and/or 驱动程序:
spark.executor.cores 4
spark.driver.cores 4
我认为问题出在您的代码的以下部分:
enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)
.toPandas()
采集数据,当记录数增长时,会导致驱动失败。
根据您的评论,这正是您使用的管道。这意味着整个阶段不仅过时而且不正确。当收集数据并进一步并行化时,可以保证
创建的分区
.repartition('column1', 'column2')
将在您重新创建 Spark 时保留 DataFrame
:
sqlContext.createDataFrame(enhDataDf)
如果想按列写数据可以直接写:
(sqlContext
.read.json(sys.argv[1])
.repartition('column1', 'column2')
.write
.json(sys.argv[2]))
跳过中间过程 toPandas
并转换为 RDD。
根据您的评论:
如果 toPandas
有用,那么它将始终是管道中的限制因素,唯一直接的解决方案是扩展驱动程序节点。根据您对收集的数据应用的确切算法,您可以考虑其他选项:
- 重新实现您在 Spark 上使用的算法目前尚不可用。
- 考虑具有更好 SciPy 堆栈互操作性(如 Dask)的替代框架。
我有 JSON 数据,我正在将这些数据读入具有多个字段的数据框中,根据两列对其进行重新分区,然后转换为 Pandas。
此作业在 EMR 上仅在 600,000 行数据上一直失败,并出现一些不明显的错误。我还增加了 spark 驱动程序的内存设置,但仍然没有看到任何解决方案。
这是我的 pyspark 代码:
enhDataDf = (
sqlContext
.read.json(sys.argv[1])
)
enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)
enhDataDf = sqlContext.createDataFrame(enhDataDf)
enhDataDf = (
enhDataDf
.toJSON()
.saveAsTextFile(sys.argv[2])
)
我的spark设置如下:
conf = SparkConf().setAppName('myapp1')
conf.set('spark.yarn.executor.memoryOverhead', 8192)
conf.set('spark.executor.memory', 8192)
conf.set('spark.driver.memory', 8192)
sc = SparkContext(conf=conf)
我得到的错误是:
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:11 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:11 ERROR ApplicationMaster: User application exited with status 143
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:56 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:56 ERROR ApplicationMaster: User application exited with status 143
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
代码在大约 600,000 JSON 行上运行良好 - 即使有大量可用内存。然后,一直失败。
对正在发生的事情以及如何调试/修复这个问题有什么想法吗?
这让我想起了我的 Spark – Container exited with a non-zero exit code 143,当时我在 集群 模式下启动 PySpark 作业,这表明您的应用程序存在内存问题。
首先尝试确定哪些机器出现故障,驱动程序或执行程序,从而能够更好地定位您的动作 - 根据我的阅读,它应该是执行程序。
我看到你已经设置了memoryOverhead
配置,很好。现在让我们关注 memory
配置:
...running Python with Spark (PySPark), so all the code of mine runs off the heap. For that reason, I have to allocate “not much” memory (since this will cut the memory I am allowed to use from the total memory; i.e. that if the total memory I am allowed to use is 20G and I am requesting 12G, then 8G will be left for my Python application to use.
所以尝试减少那个属性,是的,减少它!
下一个目标:#cores!
也减少它,例如,如果您使用 8,则在执行程序中使用 4 and/or 驱动程序:
spark.executor.cores 4
spark.driver.cores 4
我认为问题出在您的代码的以下部分:
enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)
.toPandas()
采集数据,当记录数增长时,会导致驱动失败。
根据您的评论,这正是您使用的管道。这意味着整个阶段不仅过时而且不正确。当收集数据并进一步并行化时,可以保证
创建的分区.repartition('column1', 'column2')
将在您重新创建 Spark 时保留 DataFrame
:
sqlContext.createDataFrame(enhDataDf)
如果想按列写数据可以直接写:
(sqlContext
.read.json(sys.argv[1])
.repartition('column1', 'column2')
.write
.json(sys.argv[2]))
跳过中间过程 toPandas
并转换为 RDD。
根据您的评论:
如果 toPandas
有用,那么它将始终是管道中的限制因素,唯一直接的解决方案是扩展驱动程序节点。根据您对收集的数据应用的确切算法,您可以考虑其他选项:
- 重新实现您在 Spark 上使用的算法目前尚不可用。
- 考虑具有更好 SciPy 堆栈互操作性(如 Dask)的替代框架。