棘手的 pyspark 值排序

Tricky pyspark value sorting

我想使用 pyaspark 在 rdd(下面给出的示例)中进行排序。 输入 rdd:

[('x', {3: [16, 11, 4532], 0: [5390, 3262]}),
('y', {2: [256, 128, 11], 5: [3262, 987], 3: [12]}),
('z', {17: [126, 54, 9], 0: [7654, 1768], 7: [3292, 1235, 7]})]

输出rdd:

[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]

正在进行两次排序。

我不太确定如何使用pyspark 解决上述问题。我已经尝试了下面的代码,但我猜它是不正确的(因为当数据很大时,我认为对所有进行排序不是一个好主意)。请帮忙。

def sort_values(x):
    return (x[0], dict(sorted(x[1].items())))    
rdd1 = input_rdd.map(lambda x: sort_values(x))

这里有一些你可以做的事情

pdf = pd.DataFrame({'val': [('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}), ('z',{17: [126, 54, 9], 7: [3292, 1235, 7], 0:[7654, 1768]})]})

    val
0   (x, {3: [16, 11, 4532], 0: [5390, 3262]})
1   (y, {5: [3262, 987], 3: [12], 2: [256, 128, 11]})
2   (z, {17: [126, 54, 9], 7: [3292, 1235, 7], 0: [7654, 1768]})

我们无法在输出中使用 Map,因此需要在排序后在内部转换为 tuple()

df=spark.createDataFrame(pdf)

def sort_dict_f(x):
    sorted_array = []
    for key in sorted(x[1].keys(), reverse=False): #sort the key
        sorted_array.append( (key, sorted(x[1][key]) )) #sort each internal list
    return (x[0], sorted_array)

schema = StructType([
    StructField("word", StringType(), False),
    StructField("vals", ArrayType( StructType([StructField('key', IntegerType(), False), StructField('subs', ArrayType(IntegerType()), False)])), False) 
])


SorterUDF = F.udf(sort_dict_f, schema)
df2 = df.withColumn('sorted', SorterUDF("val"))
df2.show(20, False)

最终结果如下所示:

+------------------------------------------------------------------+------------------------------------------------------------------+
|val                                                               |sorted                                                            |
+------------------------------------------------------------------+------------------------------------------------------------------+
|[x, [0 -> [5390, 3262], 2 -> [9, 8, 7], 3 -> [16, 11, 4532]]]     |[x, [[0, [3262, 5390]], [2, [7, 8, 9]], [3, [11, 16, 4532]]]]     |
|[y, [2 -> [256, 128, 11], 3 -> [12], 5 -> [3262, 987]]]           |[y, [[2, [11, 128, 256]], [3, [12]], [5, [987, 3262]]]]           |
|[z, [0 -> [7654, 1768], 17 -> [126, 54, 9], 7 -> [3292, 1235, 7]]]|[z, [[0, [1768, 7654]], [7, [7, 1235, 3292]], [17, [9, 54, 126]]]]|
+------------------------------------------------------------------+------------------------------------------------------------------+

这里是使用你可以在rdd上使用的mapPartitions函数的例子。 mapPartitions 对 rdd 的每个分区应用一个函数。

下面的代码将有助于实现分区级排序,即内部字典将被排序 -

rdd = sc.parallelize([('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}),('a', {2: [16, 4, 456], 7: [343, 3262]}),('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})],4)

def sortedpartition(iterator):
    sorted_rdd_partition=[]
    for item in iterator:
        word=item[0]
        values = item[1]
        orderDict={}
        for key in sorted(values.keys()):
            orderDict[key]=sorted(values[key])
        sorted_rdd_partition.append((word,orderDict))
    return sorted_rdd_partition

rdd.mapPartitions(sortedpartition).collect()

最终结果如下所示

Output - 
[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
 ('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
 ('a', {2: [4, 16, 456], 7: [343, 3262]}),
 ('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]

如果您需要 rdd 级别排序,请使用下面的代码行

rdd.mapPartitions(sortedpartition).sortBy(lambda x: x[0]).collect()
or 
rdd.mapPartitions(sortedpartition).sortByKey().collect()

输出-

[('a', {2: [4, 16, 456], 7: [343, 3262]}),
 ('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
 ('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
 ('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]