单独分区中 RDD 的数据帧列表

List of dataframes to RDD in separate partition

我有一个 spark 数据帧列表,我必须对它们执行一些操作 我想从中创建一个 rdd,这样每个数据帧都进入一个单独的分区,这样我就可以简单地在这个 rdd 上使用 mapPartitions 在不同的节点上并行地对每个数据帧执行计算。

下面是实现此目的的一些代码。一般方法只是将所有数据合并在一起,并添加一个 source 列来标记每一行的来源。联合调用不应该改变 DataFrame 的分区,只是将所有分区组合成一个超级 DataFrame。如果你确实有导致重新洗牌的东西,你可以使用 spark_partition_id() 添加一个具有原始分区 ID 的列,然后在 sourcepartition_id 列上调用 repartition

from pyspark.sql.functions import struct, lit, col

df1 = sc.parallelize([
    (1, 2, 3),
    (2, 3, 4)
]).toDF(["col1", "col2", "col3"])

df2 = sc.parallelize([
    (3, 4, 5),
    (4, 5, 6)
]).toDF(["col1", "col2", "col3"])

# Setup the DF's for union.  Their columns need to be in the same order and
# add a source column
df1_union = df1.select(lit("df1").alias("source"), *[col(c) for c in sorted(df1.columns)])
df2_union = df2.select(lit("df2").alias("source"), *[col(c) for c in sorted(df2.columns)])

# You could do this instead if the schemas are different
# df1_union = df1.select(lit("df1").alias("source"), struct(*df1.columns).alias("df1"), lit(None).alias("df2"))
# df2_union = df2.select(lit("df2").alias("source"), lit(None).alias("df1"), struct(*df2.columns).alias("df2"))

combined = df1_union.unionAll(df2_union) 

combined.show()
combined.rdd.mapPartitions(lambda row: do whatever..)

注意,合并后的数据如下所示:

+------+----+----+----+
|source|col1|col2|col3|
+------+----+----+----+
|   df1|   1|   2|   3|
|   df1|   2|   3|   4|
|   df2|   3|   4|   5|
|   df2|   4|   5|   6|
+------+----+----+----+