如何在 PySpark DataFrame 中强制进行特定分区?
How to force a certain partitioning in a PySpark DataFrame?
编辑 2022/02/18:几年后我又回到了这个问题,我相信我下面的新解决方案比当前投票最高的解决方案性能要好得多。
假设我有一个包含列 partition_id
:
的 DataFrame
n_partitions = 2
df = spark.sparkContext.parallelize([
[1, 'A'],
[1, 'B'],
[2, 'A'],
[2, 'C']
]).toDF(('partition_id', 'val'))
我如何重新分区 DataFrame 以保证 partition_id
的每个值都进入一个单独的分区,并且实际分区的数量与 partition_id
的不同值一样多?
如果我做一个散列分区,即 df.repartition(n_partitions, 'partition_id')
,这保证了正确的分区数量,但由于散列冲突,一些分区可能是空的,而其他分区可能包含 partition_id
的多个值。
Python 和 DataFrame
API 没有这样的选项。 Dataset
中的分区 API 不可插入,仅支持预定义的 .
您可以将数据转换为 RDD
,使用自定义分区程序进行分区,然后读取转换回 DataFrame
:
from pyspark.sql.functions import col, struct, spark_partition_id
mapping = {k: i for i, k in enumerate(
df.select("partition_id").distinct().rdd.flatMap(lambda x: x).collect()
)}
result = (df
.select("partition_id", struct([c for c in df.columns]))
.rdd.partitionBy(len(mapping), lambda k: mapping[k])
.values()
.toDF(df.schema))
result.withColumn("actual_partition_id", spark_partition_id()).show()
# +------------+---+-------------------+
# |partition_id|val|actual_partition_id|
# +------------+---+-------------------+
# | 1| A| 0|
# | 1| B| 0|
# | 2| A| 1|
# | 2| C| 1|
# +------------+---+-------------------+
请记住,这只会创建特定的数据分布,不会设置可供 Catalyst 优化器使用的分区器。
之前接受的解决方案需要从 DataFrame 转换为 RDD 并返回,但由于需要重新分区,速度非常慢。
下面给出的解决方案的性能要高得多——额外的 Spark 操作非常快,因此总体而言,它不需要 compute/shuffle 比简单的重新分区更多。
在高层次上,我们使用迭代算法来反转 Spark 的分区哈希,然后使用这个反转映射来创建新的分区键(当分区时)给出分区的预期分布。
import itertools
from pyspark.sql import Row
import pyspark.sql.functions as F
def construct_reverse_hash_map(spark, n_partitions, fact = 10):
"""
Given a target number of partitions, this function constructs a
mapping from each integer partition ID (0 through N-1) to an
arbitrary integer, which Spark will hash to that partition ID.
By using these new (seemingly arbitrary) integers as a column
to repartition on, one can guarantee a 1-to-1 mapping of
partition levels to the final partitions.
Example return value, for n_partitions=10:
{
5: 80,
9: 90,
8: 94,
7: 99,
0: 92,
1: 98,
6: 87,
2: 91,
3: 85,
4: 93
}
If one had a column in a dataframe with 10 unique values, 80, 90, 94,
etc, and then partitioned on this column into 10 partitions, then
every row with value 80 would go into partition 5, every row with
value 90 would go into partition 9, and so on.
:param spark: SparkSession object
:param n_partitions: desired number of unique partitions
:param fact: initial search space of IDs will be n_partitions*fact
:return: dictionary mapping from sequential partition IDs to hashed
partition IDs.
"""
max_retries = 10
for i in range(max_retries):
bigger_factor = fact * 2 ** i
hashes = (
spark.createDataFrame([Row(orig_id=i) for i in list(range(n_partitions * bigger_factor))])
.withColumn("h", F.hash("orig_id") % n_partitions)
.select("orig_id", F.when(F.col("h") >= 0, F.col("h")).otherwise(F.col("h") + n_partitions).alias("new_id"))
)
n_unique_ids = hashes.groupBy("new_id").count().count()
if n_unique_ids == n_partitions:
# find a mapping between the hashed values and the original partition IDs
return {row["new_id"]: row["orig_id"] for row in hashes.collect()}
raise Exception("Spark reverse hash algorithm failed to converge")
def add_deterministic_1to1_partitioner(df, original_part_col, new_part_col, part_levels, seed=42):
"""
Returns a DataFrame with a new column which can be repartitioned on to give exactly the desired partitions. We determine what
values this column will have by inverting Spark's hash.
:param df: original DataFrame
:param original_part_col: logical column to be repartitioned on
:param new_part_col: new column to be actually repartitioned on
:param part_levels: list of unique values of part_col
:param seed: seed value for quasirandom assignment to partitions
:return: original DataFrame plus new column for repartitioning
"""
part_level_map = {part_level: i for i, part_level in enumerate(part_levels)}
part_level_map_expr = F.create_map(*[F.lit(x) for x in itertools.chain(*list(part_level_map.items()))])
hash_map = construct_reverse_hash_map(df.sql_ctx.sparkSession, len(part_level_map))
hash_map_expr = F.create_map(*[F.lit(x) for x in itertools.chain(*list(hash_map.items()))])
return (
# convert partition level to sequential numeric partition ID
df.withColumn("__part_id__", part_level_map_expr[F.col(original_part_col)].astype("bigint"))
# add col which will result in 1-to-1 partitioning when repartitioend on
.withColumn(new_part_col, hash_map_expr[F.col("__part_id__")].astype("bigint"))
.drop("__part_id__")
)
演示功能:
# construct example DataFrame
data = [
[1, 'A0'],
[1, 'A1'],
[2, 'B0'],
[2, 'B1'],
[3, 'C0'],
[3, 'C1'],
]
partition_levels = list(set([pid for pid, _ in data]))
n_partitions = len(partition_levels)
df = spark.sparkContext.parallelize(data).toDF(('partition_id', 'val'))
对所需分区列的简单重新分区会导致冲突——请注意,分区 ID 为 1 和 2 的行都被放入分区 2:
df_naive_repartition = df.repartition(n_partitions, "partition_id").withColumn("actual_partition_id", F.spark_partition_id())
df_naive_repartition.orderBy("partition_id", "val").show()
#+------------+---+-------------------+
#|partition_id|val|actual_partition_id|
#+------------+---+-------------------+
#| 1| A0| 2|
#| 1| A1| 2|
#| 2| B0| 2|
#| 2| B1| 2|
#| 3| C0| 0|
#| 3| C1| 0|
#+------------+---+-------------------+
而添加确定性分区键然后用它重新分区会导致每个组都分配给一个分区:
df = add_deterministic_1to1_partitioner(df, "partition_id", "deterministic_partition_id", partition_levels)
df_1to1_repartition = df.repartition(n_partitions, "deterministic_partition_id").withColumn("actual_partition_id", F.spark_partition_id())
df_1to1_repartition.orderBy("partition_id", "val").show()
#+------------+---+--------------------------+-------------------+
#|partition_id|val|deterministic_partition_id|actual_partition_id|
#+------------+---+--------------------------+-------------------+
#| 1| A0| 28| 0|
#| 1| A1| 28| 0|
#| 2| B0| 29| 1|
#| 2| B1| 29| 1|
#| 3| C0| 27| 2|
#| 3| C1| 27| 2|
#+------------+---+--------------------------+-------------------+
(deterministic_partition_id
列可以在重新分区后删除——我在这里显示它只是为了让哈希映射函数的工作原理更加清晰。)
编辑 2022/02/18:几年后我又回到了这个问题,我相信我下面的新解决方案比当前投票最高的解决方案性能要好得多。
假设我有一个包含列 partition_id
:
n_partitions = 2
df = spark.sparkContext.parallelize([
[1, 'A'],
[1, 'B'],
[2, 'A'],
[2, 'C']
]).toDF(('partition_id', 'val'))
我如何重新分区 DataFrame 以保证 partition_id
的每个值都进入一个单独的分区,并且实际分区的数量与 partition_id
的不同值一样多?
如果我做一个散列分区,即 df.repartition(n_partitions, 'partition_id')
,这保证了正确的分区数量,但由于散列冲突,一些分区可能是空的,而其他分区可能包含 partition_id
的多个值。
Python 和 DataFrame
API 没有这样的选项。 Dataset
中的分区 API 不可插入,仅支持预定义的
您可以将数据转换为 RDD
,使用自定义分区程序进行分区,然后读取转换回 DataFrame
:
from pyspark.sql.functions import col, struct, spark_partition_id
mapping = {k: i for i, k in enumerate(
df.select("partition_id").distinct().rdd.flatMap(lambda x: x).collect()
)}
result = (df
.select("partition_id", struct([c for c in df.columns]))
.rdd.partitionBy(len(mapping), lambda k: mapping[k])
.values()
.toDF(df.schema))
result.withColumn("actual_partition_id", spark_partition_id()).show()
# +------------+---+-------------------+
# |partition_id|val|actual_partition_id|
# +------------+---+-------------------+
# | 1| A| 0|
# | 1| B| 0|
# | 2| A| 1|
# | 2| C| 1|
# +------------+---+-------------------+
请记住,这只会创建特定的数据分布,不会设置可供 Catalyst 优化器使用的分区器。
之前接受的解决方案需要从 DataFrame 转换为 RDD 并返回,但由于需要重新分区,速度非常慢。
下面给出的解决方案的性能要高得多——额外的 Spark 操作非常快,因此总体而言,它不需要 compute/shuffle 比简单的重新分区更多。
在高层次上,我们使用迭代算法来反转 Spark 的分区哈希,然后使用这个反转映射来创建新的分区键(当分区时)给出分区的预期分布。
import itertools
from pyspark.sql import Row
import pyspark.sql.functions as F
def construct_reverse_hash_map(spark, n_partitions, fact = 10):
"""
Given a target number of partitions, this function constructs a
mapping from each integer partition ID (0 through N-1) to an
arbitrary integer, which Spark will hash to that partition ID.
By using these new (seemingly arbitrary) integers as a column
to repartition on, one can guarantee a 1-to-1 mapping of
partition levels to the final partitions.
Example return value, for n_partitions=10:
{
5: 80,
9: 90,
8: 94,
7: 99,
0: 92,
1: 98,
6: 87,
2: 91,
3: 85,
4: 93
}
If one had a column in a dataframe with 10 unique values, 80, 90, 94,
etc, and then partitioned on this column into 10 partitions, then
every row with value 80 would go into partition 5, every row with
value 90 would go into partition 9, and so on.
:param spark: SparkSession object
:param n_partitions: desired number of unique partitions
:param fact: initial search space of IDs will be n_partitions*fact
:return: dictionary mapping from sequential partition IDs to hashed
partition IDs.
"""
max_retries = 10
for i in range(max_retries):
bigger_factor = fact * 2 ** i
hashes = (
spark.createDataFrame([Row(orig_id=i) for i in list(range(n_partitions * bigger_factor))])
.withColumn("h", F.hash("orig_id") % n_partitions)
.select("orig_id", F.when(F.col("h") >= 0, F.col("h")).otherwise(F.col("h") + n_partitions).alias("new_id"))
)
n_unique_ids = hashes.groupBy("new_id").count().count()
if n_unique_ids == n_partitions:
# find a mapping between the hashed values and the original partition IDs
return {row["new_id"]: row["orig_id"] for row in hashes.collect()}
raise Exception("Spark reverse hash algorithm failed to converge")
def add_deterministic_1to1_partitioner(df, original_part_col, new_part_col, part_levels, seed=42):
"""
Returns a DataFrame with a new column which can be repartitioned on to give exactly the desired partitions. We determine what
values this column will have by inverting Spark's hash.
:param df: original DataFrame
:param original_part_col: logical column to be repartitioned on
:param new_part_col: new column to be actually repartitioned on
:param part_levels: list of unique values of part_col
:param seed: seed value for quasirandom assignment to partitions
:return: original DataFrame plus new column for repartitioning
"""
part_level_map = {part_level: i for i, part_level in enumerate(part_levels)}
part_level_map_expr = F.create_map(*[F.lit(x) for x in itertools.chain(*list(part_level_map.items()))])
hash_map = construct_reverse_hash_map(df.sql_ctx.sparkSession, len(part_level_map))
hash_map_expr = F.create_map(*[F.lit(x) for x in itertools.chain(*list(hash_map.items()))])
return (
# convert partition level to sequential numeric partition ID
df.withColumn("__part_id__", part_level_map_expr[F.col(original_part_col)].astype("bigint"))
# add col which will result in 1-to-1 partitioning when repartitioend on
.withColumn(new_part_col, hash_map_expr[F.col("__part_id__")].astype("bigint"))
.drop("__part_id__")
)
演示功能:
# construct example DataFrame
data = [
[1, 'A0'],
[1, 'A1'],
[2, 'B0'],
[2, 'B1'],
[3, 'C0'],
[3, 'C1'],
]
partition_levels = list(set([pid for pid, _ in data]))
n_partitions = len(partition_levels)
df = spark.sparkContext.parallelize(data).toDF(('partition_id', 'val'))
对所需分区列的简单重新分区会导致冲突——请注意,分区 ID 为 1 和 2 的行都被放入分区 2:
df_naive_repartition = df.repartition(n_partitions, "partition_id").withColumn("actual_partition_id", F.spark_partition_id())
df_naive_repartition.orderBy("partition_id", "val").show()
#+------------+---+-------------------+
#|partition_id|val|actual_partition_id|
#+------------+---+-------------------+
#| 1| A0| 2|
#| 1| A1| 2|
#| 2| B0| 2|
#| 2| B1| 2|
#| 3| C0| 0|
#| 3| C1| 0|
#+------------+---+-------------------+
而添加确定性分区键然后用它重新分区会导致每个组都分配给一个分区:
df = add_deterministic_1to1_partitioner(df, "partition_id", "deterministic_partition_id", partition_levels)
df_1to1_repartition = df.repartition(n_partitions, "deterministic_partition_id").withColumn("actual_partition_id", F.spark_partition_id())
df_1to1_repartition.orderBy("partition_id", "val").show()
#+------------+---+--------------------------+-------------------+
#|partition_id|val|deterministic_partition_id|actual_partition_id|
#+------------+---+--------------------------+-------------------+
#| 1| A0| 28| 0|
#| 1| A1| 28| 0|
#| 2| B0| 29| 1|
#| 2| B1| 29| 1|
#| 3| C0| 27| 2|
#| 3| C1| 27| 2|
#+------------+---+--------------------------+-------------------+
(deterministic_partition_id
列可以在重新分区后删除——我在这里显示它只是为了让哈希映射函数的工作原理更加清晰。)