使用最新的spark版本时如何设置spark.sql.shuffle.partitions

how to set spark.sql.shuffle.partitions when using the lastest spark version

我想重置 pyspark 代码中的 spark.sql.shuffle.partitions 配置,因为我需要加入两个大表。但是下面的代码在最新的spark版本中不起作用,报错说"no method "setConf" in xxx"

#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

spark.sparkContext.setConf("spark.sql.shuffle.partitions", "1000")
spark.sparkContext.setConf("spark.default.parallelism", "1000")

# or using the follow, neither is working 
spark.setConf("spark.sql.shuffle.partitions", "1000")
spark.setConf("spark.default.parallelism", "1000")

我现在想知道如何重置 "spark.sql.shuffle.partitions"。

SparkSession提供了一个RuntimeConfig接口来设置和获取Spark相关参数。你的问题的答案是:

spark.conf.set("spark.sql.shuffle.partitions", 1000)

参考:https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.RuntimeConfig

我没注意到你的问题是关于 pyspark 的。 Pyspark 有一个类似的界面 spark.conf。 参考:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=sparksession#pyspark.sql.SparkSession.conf

请注意,当随机分区设置为大于 2000 时,我们在 Spark SQL "Group By" / "Distinct" 实现中发现了一个缺陷。我们使用数据集进行了测试大约 3000 条记录,38 列,其中大约 1800 条唯一记录,38 列。

当我们 运行 使用 38 列的 "Distinct" 或 "Group By" 查询并且 "spark.sql.shuffle.partitions" 设置为 2001 时,不同记录的计数小于 1800 ,比如 1794。但是,当我们将其设置为 2000 时,相同的查询给我们的记录数为 1800。

基本上,当 shuffle 分区大于 2000 时,Spark 会错误地丢弃一些记录。

我们使用 Spark v2.3.1 进行了测试,很快就会提交 Bug Jira。我需要准备一个测试数据来演示,但我们已经用我们的真实数据集确认了它。