如何确定 PySpark 数据帧分区的 "preferred location"?

How to determine "preferred location" for partitions of PySpark dataframe?

我正在尝试了解 coalesce 如何确定如何将初始分区加入最终问题,显然 "preferred location" 与它有关。

根据,Scala Spark 有一个函数preferredLocations(split: Partition) 可以识别这个。但我一点也不熟悉 Spark 的 Scala 方面。有没有办法在 PySpark 级别确定给定行或分区 ID 的首选位置?

是的,理论上是可以的。强制某种形式的偏好的示例数据(可能有一个更简单的示例):

rdd1 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)
rdd2 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)

# Force caching so downstream plan has preferences
rdd1.cache().count()

rdd3 = rdd1.union(rdd2)

现在你可以定义一个助手了:

from pyspark import SparkContext

def prefered_locations(rdd):
    def to_py_generator(xs):
        """Convert Scala List to Python generator"""
        j_iter = xs.iterator()
        while j_iter.hasNext():
            yield j_iter.next()

    # Get JVM
    jvm =  SparkContext._active_spark_context._jvm
    # Get Scala RDD
    srdd = jvm.org.apache.spark.api.java.JavaRDD.toRDD(rdd._jrdd)
    # Get partitions
    partitions = srdd.partitions()
    return {
        p.index(): list(to_py_generator(srdd.preferredLocations(p)))
        for p in partitions
    }

已申请:

prefered_locations(rdd3)

# {0: ['...'],
#  1: ['...'],
#  2: ['...'],
#  3: ['...'],
#  4: [],
#  5: [],
#  6: [],
#  7: []}