Spark Core 如何在不使用 Rdd.max() 的情况下获取 RDD 函数的最大 n 行

Spark Core How to fetch max n rows of an RDD function without using Rdd.max()

我有一个 RDD 具有以下元素:

('09', [25, 66, 67])
('17', [66, 67, 39])
('04', [25])
('08', [120, 122])
('28', [25, 67])
('30', [122])

我需要获取列表中具有最大元素数的元素,在上面的 RDD 中为 3 O/p 应该过滤到另一个 RDD 中,而不是使用 max 函数spark 数据帧:

('09', [25, 66, 67])
('17', [66, 67, 39])
max_len = uniqueRDD.max(lambda x: len(x[1]))
maxRDD = uniqueRDD.filter(lambda x : (len(x[1]) == len(max_len[1])))

我可以使用上面的代码行,但是 Spark Streaming 不支持这个,因为 max_len 是一个元组而不是 RDD

有人可以建议吗?提前致谢

这对你有用吗?我也尝试过滤流式 rdds。似乎有效。

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import *
from pyspark.streaming import StreamingContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc,1)



data1 = [
    ('09', [25, 66, 67]),
    ('17', [66, 67, 39]),
    ('04', [25]),
    ('08', [120, 122]),
    ('28', [25, 67]),
    ('30', [122])
]


df1Columns = ["id", "list"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1.show(20, truncate=False)

uniqueRDD = df1.rdd


max_len = uniqueRDD.map(lambda x: len(x[1])).max(lambda x: x)
maxRDD = uniqueRDD.filter(lambda x : (len(x[1]) == max_len))

print("printing out maxlength = ", max_len)

dStream = ssc.queueStream([uniqueRDD])

resultStream = dStream.filter(lambda x : (len(x[1]) == max_len))

print("Printing the filtered streaming result")

def printResultStream(rdd):
    mylist = rdd.collect()
    for ele in mylist:
        print(ele)

resultStream.foreachRDD(printResultStream)

ssc.start()
ssc.awaitTermination()
ssc.stop()

这是输出:

+---+------------+
|id |list        |
+---+------------+
|09 |[25, 66, 67]|
|17 |[66, 67, 39]|
|04 |[25]        |
|08 |[120, 122]  |
|28 |[25, 67]    |
|30 |[122]       |
+---+------------+

printing out maxlength =  3
Printing the filtered streaming result
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])

您可以尝试这样的操作:

dStream = ssc.queueStream([uniqueRDD, uniqueRDD, uniqueRDD])
def maxOverRDD(input_rdd):
    if not input_rdd.isEmpty():
        reduced_rdd = input_rdd.reduce(lambda acc, value : value if (len(acc[1]) < len(value[1])) else acc)
        internal_result = input_rdd.filter(lambda x: len(x[1]) == len(reduced_rdd[1]))
        return internal_result


result = dStream.transform(maxOverRDD)
print("Printing the finalStream")
result.foreachRDD(printResultStream)

输出会像(输出是重复的,因为相同的 RDD 在流中提供了 3 次):

Printing the finalStream
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])