Spark - RDD 在节点上的等分
Spark - Equal partitioning of RDD on nodes
我有一个有 4 个节点的架构和一个有 4000 行的 RDD,我需要在节点上平均地重新分区这个 RDD。结果应该是:
node 1 -> 1000 rows
node 2 -> 1000 rows
node 3 -> 1000 rows
node 4 -> 1000 rows.
如何在 Python 中执行此操作?
如果您使用文本文件构建 rdd,您可以使用:
scala> val rdd = sc.textFile("hdfs://.../input.txt", 4)
或者您可以使用:
scala> rdd = rdd.repartition(4)
rdd.repartition(n) 做一个shuffle来拆分数据以匹配n个分区。
我尝试使用 pyspark 实施@sramalingam24 已经提出的解决方案,因为您已经在使用它。
from collections import Counter
data = [(i,j) for i,j in zip([i/1000 for i in range(0, 4000, 1)], range(500, 4500, 1))]
rdd = sc.parallelize(data).map(lambda x : (x[0], x[1]))
df = sqlContext.createDataFrame(rdd, ['key', 'values'])
df = df.repartition('key')
检查结果:
Counter(df.select(spark_partition_id()).collect())
Out[*]: Counter({Row(SPARK_PARTITION_ID()=5): 1000, Row(SPARK_PARTITION_ID()=128): 1000, Row(SPARK_PARTITION_ID()=107): 1000, Row(SPARK_PARTITION_ID()=69): 1000})
我有一个有 4 个节点的架构和一个有 4000 行的 RDD,我需要在节点上平均地重新分区这个 RDD。结果应该是:
node 1 -> 1000 rows
node 2 -> 1000 rows
node 3 -> 1000 rows
node 4 -> 1000 rows.
如何在 Python 中执行此操作?
如果您使用文本文件构建 rdd,您可以使用:
scala> val rdd = sc.textFile("hdfs://.../input.txt", 4)
或者您可以使用:
scala> rdd = rdd.repartition(4)
rdd.repartition(n) 做一个shuffle来拆分数据以匹配n个分区。
我尝试使用 pyspark 实施@sramalingam24 已经提出的解决方案,因为您已经在使用它。
from collections import Counter
data = [(i,j) for i,j in zip([i/1000 for i in range(0, 4000, 1)], range(500, 4500, 1))]
rdd = sc.parallelize(data).map(lambda x : (x[0], x[1]))
df = sqlContext.createDataFrame(rdd, ['key', 'values'])
df = df.repartition('key')
检查结果:
Counter(df.select(spark_partition_id()).collect())
Out[*]: Counter({Row(SPARK_PARTITION_ID()=5): 1000, Row(SPARK_PARTITION_ID()=128): 1000, Row(SPARK_PARTITION_ID()=107): 1000, Row(SPARK_PARTITION_ID()=69): 1000})