使用 spark scala 比较两个大型数据集时出现内存不足问题
Out of memory issue when compare two large datasets using spark scala
我每天使用 Spark scala 程序将 1000 万条记录从 Mysql 导入 Hive,并比较昨天和今天的数据集。
val yesterdayDf=sqlContext.sql("select * from t_yesterdayProducts");
val todayDf=sqlContext.sql("select * from t_todayProducts");
val diffDf=todayDf.except(yesterdayDf);
我正在使用 3 节点集群和程序,可以很好地处理 400 万条记录。
对于超过 400 万,我们面临内存不足的问题,因为 RAM 内存不足。
我想知道比较两个大型数据集的最佳方法。
你有没有试过找出你有多少个分区:
yesterdayDf.rdd.partitions.size
将为您提供 yesterdayDf 数据帧的信息,您也可以对其他数据帧执行相同的操作。
您还可以使用
yesterdayDf.repartition(1000) // (a large number)
查看 OOM 问题是否消失。
这个问题的原因不好说。但问题可能是出于某种原因,工作人员获取了太多数据。尝试清除数据帧以执行除外操作。根据我在评论中的问题,你说你有关键列所以只取他们这样的:
val yesterdayDfKey = yesterdayDf.select("key-column")
val todayDfKey = todayDf.select("key-column")
val diffDf=todayDfKey.except(yesterdayDfKey);
有了它,您将获得一个带有键的数据框。比你可以用这样的连接做一个过滤器 .
您还需要确保您的 yarn.nodemanager.resource.memory-mb 大于您的 --executor-memory。
您也可以尝试使用 left_anti 连接键连接两个 df,然后检查记录数
我每天使用 Spark scala 程序将 1000 万条记录从 Mysql 导入 Hive,并比较昨天和今天的数据集。
val yesterdayDf=sqlContext.sql("select * from t_yesterdayProducts");
val todayDf=sqlContext.sql("select * from t_todayProducts");
val diffDf=todayDf.except(yesterdayDf);
我正在使用 3 节点集群和程序,可以很好地处理 400 万条记录。 对于超过 400 万,我们面临内存不足的问题,因为 RAM 内存不足。
我想知道比较两个大型数据集的最佳方法。
你有没有试过找出你有多少个分区:
yesterdayDf.rdd.partitions.size
将为您提供 yesterdayDf 数据帧的信息,您也可以对其他数据帧执行相同的操作。
您还可以使用
yesterdayDf.repartition(1000) // (a large number)
查看 OOM 问题是否消失。
这个问题的原因不好说。但问题可能是出于某种原因,工作人员获取了太多数据。尝试清除数据帧以执行除外操作。根据我在评论中的问题,你说你有关键列所以只取他们这样的:
val yesterdayDfKey = yesterdayDf.select("key-column")
val todayDfKey = todayDf.select("key-column")
val diffDf=todayDfKey.except(yesterdayDfKey);
有了它,您将获得一个带有键的数据框。比你可以用这样的连接做一个过滤器
您还需要确保您的 yarn.nodemanager.resource.memory-mb 大于您的 --executor-memory。
您也可以尝试使用 left_anti 连接键连接两个 df,然后检查记录数