棘手的 pyspark 值排序
Tricky pyspark value sorting
我想使用 pyaspark 在 rdd(下面给出的示例)中进行排序。
输入 rdd:
[('x', {3: [16, 11, 4532], 0: [5390, 3262]}),
('y', {2: [256, 128, 11], 5: [3262, 987], 3: [12]}),
('z', {17: [126, 54, 9], 0: [7654, 1768], 7: [3292, 1235, 7]})]
输出rdd:
[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]
正在进行两次排序。
我不太确定如何使用pyspark 解决上述问题。我已经尝试了下面的代码,但我猜它是不正确的(因为当数据很大时,我认为对所有进行排序不是一个好主意)。请帮忙。
def sort_values(x):
return (x[0], dict(sorted(x[1].items())))
rdd1 = input_rdd.map(lambda x: sort_values(x))
这里有一些你可以做的事情
pdf = pd.DataFrame({'val': [('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}), ('z',{17: [126, 54, 9], 7: [3292, 1235, 7], 0:[7654, 1768]})]})
val
0 (x, {3: [16, 11, 4532], 0: [5390, 3262]})
1 (y, {5: [3262, 987], 3: [12], 2: [256, 128, 11]})
2 (z, {17: [126, 54, 9], 7: [3292, 1235, 7], 0: [7654, 1768]})
我们无法在输出中使用 Map,因此需要在排序后在内部转换为 tuple()
df=spark.createDataFrame(pdf)
def sort_dict_f(x):
sorted_array = []
for key in sorted(x[1].keys(), reverse=False): #sort the key
sorted_array.append( (key, sorted(x[1][key]) )) #sort each internal list
return (x[0], sorted_array)
schema = StructType([
StructField("word", StringType(), False),
StructField("vals", ArrayType( StructType([StructField('key', IntegerType(), False), StructField('subs', ArrayType(IntegerType()), False)])), False)
])
SorterUDF = F.udf(sort_dict_f, schema)
df2 = df.withColumn('sorted', SorterUDF("val"))
df2.show(20, False)
最终结果如下所示:
+------------------------------------------------------------------+------------------------------------------------------------------+
|val |sorted |
+------------------------------------------------------------------+------------------------------------------------------------------+
|[x, [0 -> [5390, 3262], 2 -> [9, 8, 7], 3 -> [16, 11, 4532]]] |[x, [[0, [3262, 5390]], [2, [7, 8, 9]], [3, [11, 16, 4532]]]] |
|[y, [2 -> [256, 128, 11], 3 -> [12], 5 -> [3262, 987]]] |[y, [[2, [11, 128, 256]], [3, [12]], [5, [987, 3262]]]] |
|[z, [0 -> [7654, 1768], 17 -> [126, 54, 9], 7 -> [3292, 1235, 7]]]|[z, [[0, [1768, 7654]], [7, [7, 1235, 3292]], [17, [9, 54, 126]]]]|
+------------------------------------------------------------------+------------------------------------------------------------------+
这里是使用你可以在rdd上使用的mapPartitions函数的例子。
mapPartitions 对 rdd 的每个分区应用一个函数。
下面的代码将有助于实现分区级排序,即内部字典将被排序 -
rdd = sc.parallelize([('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}),('a', {2: [16, 4, 456], 7: [343, 3262]}),('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})],4)
def sortedpartition(iterator):
sorted_rdd_partition=[]
for item in iterator:
word=item[0]
values = item[1]
orderDict={}
for key in sorted(values.keys()):
orderDict[key]=sorted(values[key])
sorted_rdd_partition.append((word,orderDict))
return sorted_rdd_partition
rdd.mapPartitions(sortedpartition).collect()
最终结果如下所示
Output -
[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('a', {2: [4, 16, 456], 7: [343, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]
如果您需要 rdd 级别排序,请使用下面的代码行
rdd.mapPartitions(sortedpartition).sortBy(lambda x: x[0]).collect()
or
rdd.mapPartitions(sortedpartition).sortByKey().collect()
输出-
[('a', {2: [4, 16, 456], 7: [343, 3262]}),
('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]
我想使用 pyaspark 在 rdd(下面给出的示例)中进行排序。 输入 rdd:
[('x', {3: [16, 11, 4532], 0: [5390, 3262]}),
('y', {2: [256, 128, 11], 5: [3262, 987], 3: [12]}),
('z', {17: [126, 54, 9], 0: [7654, 1768], 7: [3292, 1235, 7]})]
输出rdd:
[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]
正在进行两次排序。
我不太确定如何使用pyspark 解决上述问题。我已经尝试了下面的代码,但我猜它是不正确的(因为当数据很大时,我认为对所有进行排序不是一个好主意)。请帮忙。
def sort_values(x):
return (x[0], dict(sorted(x[1].items())))
rdd1 = input_rdd.map(lambda x: sort_values(x))
这里有一些你可以做的事情
pdf = pd.DataFrame({'val': [('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}), ('z',{17: [126, 54, 9], 7: [3292, 1235, 7], 0:[7654, 1768]})]})
val
0 (x, {3: [16, 11, 4532], 0: [5390, 3262]})
1 (y, {5: [3262, 987], 3: [12], 2: [256, 128, 11]})
2 (z, {17: [126, 54, 9], 7: [3292, 1235, 7], 0: [7654, 1768]})
我们无法在输出中使用 Map,因此需要在排序后在内部转换为 tuple()
df=spark.createDataFrame(pdf)
def sort_dict_f(x):
sorted_array = []
for key in sorted(x[1].keys(), reverse=False): #sort the key
sorted_array.append( (key, sorted(x[1][key]) )) #sort each internal list
return (x[0], sorted_array)
schema = StructType([
StructField("word", StringType(), False),
StructField("vals", ArrayType( StructType([StructField('key', IntegerType(), False), StructField('subs', ArrayType(IntegerType()), False)])), False)
])
SorterUDF = F.udf(sort_dict_f, schema)
df2 = df.withColumn('sorted', SorterUDF("val"))
df2.show(20, False)
最终结果如下所示:
+------------------------------------------------------------------+------------------------------------------------------------------+
|val |sorted |
+------------------------------------------------------------------+------------------------------------------------------------------+
|[x, [0 -> [5390, 3262], 2 -> [9, 8, 7], 3 -> [16, 11, 4532]]] |[x, [[0, [3262, 5390]], [2, [7, 8, 9]], [3, [11, 16, 4532]]]] |
|[y, [2 -> [256, 128, 11], 3 -> [12], 5 -> [3262, 987]]] |[y, [[2, [11, 128, 256]], [3, [12]], [5, [987, 3262]]]] |
|[z, [0 -> [7654, 1768], 17 -> [126, 54, 9], 7 -> [3292, 1235, 7]]]|[z, [[0, [1768, 7654]], [7, [7, 1235, 3292]], [17, [9, 54, 126]]]]|
+------------------------------------------------------------------+------------------------------------------------------------------+
这里是使用你可以在rdd上使用的mapPartitions函数的例子。 mapPartitions 对 rdd 的每个分区应用一个函数。
下面的代码将有助于实现分区级排序,即内部字典将被排序 -
rdd = sc.parallelize([('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}),('a', {2: [16, 4, 456], 7: [343, 3262]}),('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})],4)
def sortedpartition(iterator):
sorted_rdd_partition=[]
for item in iterator:
word=item[0]
values = item[1]
orderDict={}
for key in sorted(values.keys()):
orderDict[key]=sorted(values[key])
sorted_rdd_partition.append((word,orderDict))
return sorted_rdd_partition
rdd.mapPartitions(sortedpartition).collect()
最终结果如下所示
Output -
[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('a', {2: [4, 16, 456], 7: [343, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]
如果您需要 rdd 级别排序,请使用下面的代码行
rdd.mapPartitions(sortedpartition).sortBy(lambda x: x[0]).collect()
or
rdd.mapPartitions(sortedpartition).sortByKey().collect()
输出-
[('a', {2: [4, 16, 456], 7: [343, 3262]}),
('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]