如何确定 PySpark 数据帧分区的 "preferred location"?
How to determine "preferred location" for partitions of PySpark dataframe?
我正在尝试了解 coalesce
如何确定如何将初始分区加入最终问题,显然 "preferred location" 与它有关。
根据,Scala Spark 有一个函数preferredLocations(split: Partition)
可以识别这个。但我一点也不熟悉 Spark 的 Scala 方面。有没有办法在 PySpark 级别确定给定行或分区 ID 的首选位置?
是的,理论上是可以的。强制某种形式的偏好的示例数据(可能有一个更简单的示例):
rdd1 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)
rdd2 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)
# Force caching so downstream plan has preferences
rdd1.cache().count()
rdd3 = rdd1.union(rdd2)
现在你可以定义一个助手了:
from pyspark import SparkContext
def prefered_locations(rdd):
def to_py_generator(xs):
"""Convert Scala List to Python generator"""
j_iter = xs.iterator()
while j_iter.hasNext():
yield j_iter.next()
# Get JVM
jvm = SparkContext._active_spark_context._jvm
# Get Scala RDD
srdd = jvm.org.apache.spark.api.java.JavaRDD.toRDD(rdd._jrdd)
# Get partitions
partitions = srdd.partitions()
return {
p.index(): list(to_py_generator(srdd.preferredLocations(p)))
for p in partitions
}
已申请:
prefered_locations(rdd3)
# {0: ['...'],
# 1: ['...'],
# 2: ['...'],
# 3: ['...'],
# 4: [],
# 5: [],
# 6: [],
# 7: []}
我正在尝试了解 coalesce
如何确定如何将初始分区加入最终问题,显然 "preferred location" 与它有关。
根据preferredLocations(split: Partition)
可以识别这个。但我一点也不熟悉 Spark 的 Scala 方面。有没有办法在 PySpark 级别确定给定行或分区 ID 的首选位置?
是的,理论上是可以的。强制某种形式的偏好的示例数据(可能有一个更简单的示例):
rdd1 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)
rdd2 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)
# Force caching so downstream plan has preferences
rdd1.cache().count()
rdd3 = rdd1.union(rdd2)
现在你可以定义一个助手了:
from pyspark import SparkContext
def prefered_locations(rdd):
def to_py_generator(xs):
"""Convert Scala List to Python generator"""
j_iter = xs.iterator()
while j_iter.hasNext():
yield j_iter.next()
# Get JVM
jvm = SparkContext._active_spark_context._jvm
# Get Scala RDD
srdd = jvm.org.apache.spark.api.java.JavaRDD.toRDD(rdd._jrdd)
# Get partitions
partitions = srdd.partitions()
return {
p.index(): list(to_py_generator(srdd.preferredLocations(p)))
for p in partitions
}
已申请:
prefered_locations(rdd3)
# {0: ['...'],
# 1: ['...'],
# 2: ['...'],
# 3: ['...'],
# 4: [],
# 5: [],
# 6: [],
# 7: []}