Spark dense_rank window 函数 - 没有 partitionBy 子句

Spark dense_rank window function - without a partitionBy clause

我正在使用 Spark 1.6.2、Scala 2.10.5 和 Java 1.7。

我们的用例需要我们在不使用 partitionBy 子句的情况下对超过 2 亿行的数据集执行 dense_rank(),仅使用 orderBy 子句。这目前在 MSSQL 中运行,大约需要 30 分钟才能完成。

我已经在 Spark 中实现了如下所示的等效逻辑:

val df1 = hqlContext.read.format("jdbc").options(
  Map("url" -> url, "driver" -> driver,
  "dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()

df1.cache()

val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))

我正在以 Yarn-cluster 模式提交作业,如下所示。我有一个 2 节点 Hadoop 2.6 集群,每个集群有 4 个 vCore 和 32 GB 内存。

spark-submit --class com.spgmi.csd.OshpStdCarryOver --master  yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar

在日志中,我可以看到 table 来自 MSSQL 的大约 2 亿行在 15 分钟内被导入并缓存在 Spark 中。我看到在这个阶段之前使用了大约 5 GB 的内存,其中一个执行程序仍然有大约 6.2 GB 的空闲内存,另一个执行程序有 11 GB 的空闲内存。

但是,dense_rank() 的步骤总是在几分钟后失败并出现 "GC Overhead limit exceeded" 错误。正如您在上面的 spark-submit 命令中注意到的那样,我什至将驱动程序内存设置为 7g。但是,无济于事! 当然,我知道缺少 partitionBy 子句实际上在 Spark 中造成了麻烦。但是,不幸的是,这是我们需要处理的用例。

你能在这里说明更多吗?我错过了什么吗?除了在 Spark 中使用 dense_rank window 函数之外,还有其他选择吗?例如,使用本论坛其他地方的一位专家建议的 "zipWithIndex" 函数?据我所知,它会产生与 dense_rank 相同的结果 "zipWithIndex" 方法复制 row_number() 函数而不是 dense_rank 吗?

如有任何有用的建议,我们将不胜感激! 非常感谢!

这里有两个不同的问题:

  • 您通过 JDBC 连接加载数据而不提供分区列或分区谓词。这使用单个执行程序线程加载所有数据。

    这个问题通常很容易解决,要么使用现有的列之一,要么提供人工键。

  • 您使用 window 个函数而没有 partitionBy。结果,所有数据都重新洗牌到单个分区,在本地排序,并使用单个线程进行处理。

    一般来说,没有通用的解决方案可以仅使用 Dataset API 来解决这个问题,但您可以使用一些技巧:

    • 创建反映所需记录排序的人工分区。我在

      的回答中描述了这种方法

      在您的情况下可以使用类似的方法,但它需要多步过程,相当于下面描述的过程。

    • 使用关联方法,您可以对排序的 RDD 使用两个单独的扫描(应该可以在不从 Dataset 转换的情况下做类似的事情)和其他操作:

      • 计算每个分区的部分结果(在您的情况下,给定分区的排名)。
      • 收集所需的摘要(在您的情况下是分区边界和每个分区的累积排名值)。
      • 执行第二次扫描以更正来自先前分区的部分聚合。

    可以在

  • 中找到此方法的一个示例,可以轻松调整以适合您的情况