Spark dense_rank window 函数 - 没有 partitionBy 子句
Spark dense_rank window function - without a partitionBy clause
我正在使用 Spark 1.6.2、Scala 2.10.5 和 Java 1.7。
我们的用例需要我们在不使用 partitionBy 子句的情况下对超过 2 亿行的数据集执行 dense_rank(),仅使用 orderBy 子句。这目前在 MSSQL 中运行,大约需要 30 分钟才能完成。
我已经在 Spark 中实现了如下所示的等效逻辑:
val df1 = hqlContext.read.format("jdbc").options(
Map("url" -> url, "driver" -> driver,
"dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()
df1.cache()
val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))
我正在以 Yarn-cluster 模式提交作业,如下所示。我有一个 2 节点 Hadoop 2.6 集群,每个集群有 4 个 vCore 和 32 GB 内存。
spark-submit --class com.spgmi.csd.OshpStdCarryOver --master yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar
在日志中,我可以看到 table 来自 MSSQL 的大约 2 亿行在 15 分钟内被导入并缓存在 Spark 中。我看到在这个阶段之前使用了大约 5 GB 的内存,其中一个执行程序仍然有大约 6.2 GB 的空闲内存,另一个执行程序有 11 GB 的空闲内存。
但是,dense_rank() 的步骤总是在几分钟后失败并出现 "GC Overhead limit exceeded" 错误。正如您在上面的 spark-submit 命令中注意到的那样,我什至将驱动程序内存设置为 7g。但是,无济于事!
当然,我知道缺少 partitionBy 子句实际上在 Spark 中造成了麻烦。但是,不幸的是,这是我们需要处理的用例。
你能在这里说明更多吗?我错过了什么吗?除了在 Spark 中使用 dense_rank window 函数之外,还有其他选择吗?例如,使用本论坛其他地方的一位专家建议的 "zipWithIndex" 函数?据我所知,它会产生与 dense_rank 相同的结果 "zipWithIndex" 方法复制 row_number() 函数而不是 dense_rank 吗?
如有任何有用的建议,我们将不胜感激!
非常感谢!
这里有两个不同的问题:
您通过 JDBC 连接加载数据而不提供分区列或分区谓词。这使用单个执行程序线程加载所有数据。
这个问题通常很容易解决,要么使用现有的列之一,要么提供人工键。
您使用 window 个函数而没有 partitionBy
。结果,所有数据都重新洗牌到单个分区,在本地排序,并使用单个线程进行处理。
一般来说,没有通用的解决方案可以仅使用 Dataset
API 来解决这个问题,但您可以使用一些技巧:
创建反映所需记录排序的人工分区。我在
的回答中描述了这种方法
在您的情况下可以使用类似的方法,但它需要多步过程,相当于下面描述的过程。
使用关联方法,您可以对排序的 RDD
使用两个单独的扫描(应该可以在不从 Dataset
转换的情况下做类似的事情)和其他操作:
- 计算每个分区的部分结果(在您的情况下,给定分区的排名)。
- 收集所需的摘要(在您的情况下是分区边界和每个分区的累积排名值)。
- 执行第二次扫描以更正来自先前分区的部分聚合。
可以在
中找到此方法的一个示例,可以轻松调整以适合您的情况
我正在使用 Spark 1.6.2、Scala 2.10.5 和 Java 1.7。
我们的用例需要我们在不使用 partitionBy 子句的情况下对超过 2 亿行的数据集执行 dense_rank(),仅使用 orderBy 子句。这目前在 MSSQL 中运行,大约需要 30 分钟才能完成。
我已经在 Spark 中实现了如下所示的等效逻辑:
val df1 = hqlContext.read.format("jdbc").options(
Map("url" -> url, "driver" -> driver,
"dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()
df1.cache()
val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))
我正在以 Yarn-cluster 模式提交作业,如下所示。我有一个 2 节点 Hadoop 2.6 集群,每个集群有 4 个 vCore 和 32 GB 内存。
spark-submit --class com.spgmi.csd.OshpStdCarryOver --master yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar
在日志中,我可以看到 table 来自 MSSQL 的大约 2 亿行在 15 分钟内被导入并缓存在 Spark 中。我看到在这个阶段之前使用了大约 5 GB 的内存,其中一个执行程序仍然有大约 6.2 GB 的空闲内存,另一个执行程序有 11 GB 的空闲内存。
但是,dense_rank() 的步骤总是在几分钟后失败并出现 "GC Overhead limit exceeded" 错误。正如您在上面的 spark-submit 命令中注意到的那样,我什至将驱动程序内存设置为 7g。但是,无济于事! 当然,我知道缺少 partitionBy 子句实际上在 Spark 中造成了麻烦。但是,不幸的是,这是我们需要处理的用例。
你能在这里说明更多吗?我错过了什么吗?除了在 Spark 中使用 dense_rank window 函数之外,还有其他选择吗?例如,使用本论坛其他地方的一位专家建议的 "zipWithIndex" 函数?据我所知,它会产生与 dense_rank 相同的结果 "zipWithIndex" 方法复制 row_number() 函数而不是 dense_rank 吗?
如有任何有用的建议,我们将不胜感激! 非常感谢!
这里有两个不同的问题:
您通过 JDBC 连接加载数据而不提供分区列或分区谓词。这使用单个执行程序线程加载所有数据。
这个问题通常很容易解决,要么使用现有的列之一,要么提供人工键。
您使用 window 个函数而没有
partitionBy
。结果,所有数据都重新洗牌到单个分区,在本地排序,并使用单个线程进行处理。一般来说,没有通用的解决方案可以仅使用
Dataset
API 来解决这个问题,但您可以使用一些技巧:创建反映所需记录排序的人工分区。我在
的回答中描述了这种方法在您的情况下可以使用类似的方法,但它需要多步过程,相当于下面描述的过程。
使用关联方法,您可以对排序的
RDD
使用两个单独的扫描(应该可以在不从Dataset
转换的情况下做类似的事情)和其他操作:- 计算每个分区的部分结果(在您的情况下,给定分区的排名)。
- 收集所需的摘要(在您的情况下是分区边界和每个分区的累积排名值)。
- 执行第二次扫描以更正来自先前分区的部分聚合。
可以在
中找到此方法的一个示例,可以轻松调整以适合您的情况