Spark Core 如何在不使用 Rdd.max() 的情况下获取 RDD 函数的最大 n 行
Spark Core How to fetch max n rows of an RDD function without using Rdd.max()
我有一个 RDD 具有以下元素:
('09', [25, 66, 67])
('17', [66, 67, 39])
('04', [25])
('08', [120, 122])
('28', [25, 67])
('30', [122])
我需要获取列表中具有最大元素数的元素,在上面的 RDD 中为 3 O/p 应该过滤到另一个 RDD 中,而不是使用 max 函数 和 spark 数据帧:
('09', [25, 66, 67])
('17', [66, 67, 39])
max_len = uniqueRDD.max(lambda x: len(x[1]))
maxRDD = uniqueRDD.filter(lambda x : (len(x[1]) == len(max_len[1])))
我可以使用上面的代码行,但是 Spark Streaming 不支持这个,因为 max_len 是一个元组而不是 RDD
有人可以建议吗?提前致谢
这对你有用吗?我也尝试过滤流式 rdds。似乎有效。
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import *
from pyspark.streaming import StreamingContext
sc = SparkContext('local')
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc,1)
data1 = [
('09', [25, 66, 67]),
('17', [66, 67, 39]),
('04', [25]),
('08', [120, 122]),
('28', [25, 67]),
('30', [122])
]
df1Columns = ["id", "list"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1.show(20, truncate=False)
uniqueRDD = df1.rdd
max_len = uniqueRDD.map(lambda x: len(x[1])).max(lambda x: x)
maxRDD = uniqueRDD.filter(lambda x : (len(x[1]) == max_len))
print("printing out maxlength = ", max_len)
dStream = ssc.queueStream([uniqueRDD])
resultStream = dStream.filter(lambda x : (len(x[1]) == max_len))
print("Printing the filtered streaming result")
def printResultStream(rdd):
mylist = rdd.collect()
for ele in mylist:
print(ele)
resultStream.foreachRDD(printResultStream)
ssc.start()
ssc.awaitTermination()
ssc.stop()
这是输出:
+---+------------+
|id |list |
+---+------------+
|09 |[25, 66, 67]|
|17 |[66, 67, 39]|
|04 |[25] |
|08 |[120, 122] |
|28 |[25, 67] |
|30 |[122] |
+---+------------+
printing out maxlength = 3
Printing the filtered streaming result
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
您可以尝试这样的操作:
dStream = ssc.queueStream([uniqueRDD, uniqueRDD, uniqueRDD])
def maxOverRDD(input_rdd):
if not input_rdd.isEmpty():
reduced_rdd = input_rdd.reduce(lambda acc, value : value if (len(acc[1]) < len(value[1])) else acc)
internal_result = input_rdd.filter(lambda x: len(x[1]) == len(reduced_rdd[1]))
return internal_result
result = dStream.transform(maxOverRDD)
print("Printing the finalStream")
result.foreachRDD(printResultStream)
输出会像(输出是重复的,因为相同的 RDD 在流中提供了 3 次):
Printing the finalStream
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
我有一个 RDD 具有以下元素:
('09', [25, 66, 67])
('17', [66, 67, 39])
('04', [25])
('08', [120, 122])
('28', [25, 67])
('30', [122])
我需要获取列表中具有最大元素数的元素,在上面的 RDD 中为 3 O/p 应该过滤到另一个 RDD 中,而不是使用 max 函数 和 spark 数据帧:
('09', [25, 66, 67])
('17', [66, 67, 39])
max_len = uniqueRDD.max(lambda x: len(x[1]))
maxRDD = uniqueRDD.filter(lambda x : (len(x[1]) == len(max_len[1])))
我可以使用上面的代码行,但是 Spark Streaming 不支持这个,因为 max_len 是一个元组而不是 RDD
有人可以建议吗?提前致谢
这对你有用吗?我也尝试过滤流式 rdds。似乎有效。
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import *
from pyspark.streaming import StreamingContext
sc = SparkContext('local')
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc,1)
data1 = [
('09', [25, 66, 67]),
('17', [66, 67, 39]),
('04', [25]),
('08', [120, 122]),
('28', [25, 67]),
('30', [122])
]
df1Columns = ["id", "list"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1.show(20, truncate=False)
uniqueRDD = df1.rdd
max_len = uniqueRDD.map(lambda x: len(x[1])).max(lambda x: x)
maxRDD = uniqueRDD.filter(lambda x : (len(x[1]) == max_len))
print("printing out maxlength = ", max_len)
dStream = ssc.queueStream([uniqueRDD])
resultStream = dStream.filter(lambda x : (len(x[1]) == max_len))
print("Printing the filtered streaming result")
def printResultStream(rdd):
mylist = rdd.collect()
for ele in mylist:
print(ele)
resultStream.foreachRDD(printResultStream)
ssc.start()
ssc.awaitTermination()
ssc.stop()
这是输出:
+---+------------+
|id |list |
+---+------------+
|09 |[25, 66, 67]|
|17 |[66, 67, 39]|
|04 |[25] |
|08 |[120, 122] |
|28 |[25, 67] |
|30 |[122] |
+---+------------+
printing out maxlength = 3
Printing the filtered streaming result
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
您可以尝试这样的操作:
dStream = ssc.queueStream([uniqueRDD, uniqueRDD, uniqueRDD])
def maxOverRDD(input_rdd):
if not input_rdd.isEmpty():
reduced_rdd = input_rdd.reduce(lambda acc, value : value if (len(acc[1]) < len(value[1])) else acc)
internal_result = input_rdd.filter(lambda x: len(x[1]) == len(reduced_rdd[1]))
return internal_result
result = dStream.transform(maxOverRDD)
print("Printing the finalStream")
result.foreachRDD(printResultStream)
输出会像(输出是重复的,因为相同的 RDD 在流中提供了 3 次):
Printing the finalStream
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])