遍历配对 RDD (Pyspark) 的值并替换空值
Iterating through values of a paired RDD (Pyspark) and replacing null values
我正在使用 Spark RDD API 收集数据并创建了一个配对的 RDD,如下所示:
spark = SparkSession.builder.master('local').appName('app').getOrCreate()
sc = spark.sparkContext
raw_rdd = sc.textFile("data.csv")
paired_rdd = raw_rdd\
.map(lambda x: x.split(","))\
.map(lambda x: (x[2], [x[1], x[3],x[5]]))
这里是配对 RDD 的示例摘录:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', '', '']),
('VXIO456XLBB630221', ['R', '', '']),
('VXIO456XLBB630221', ['R', '', ''])]
如你所见,这个成对的 RDD 中的键对于所有元素都是相同的,但是只有一个元素完成了所有字段。
我们想要完成什么?我们想用具有完整字段的元素的值替换空字段。所以我们会有这样的预期输出:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003'])]
我知道第一步是 groupByKey
,即
paired_rdd.groupByKey().map(lambda kv: ____)
我只是不确定如何遍历这些值以及这如何适合一个 lambda 函数。
最好的方法可能是使用数据帧和 window 函数。使用 RDD,您也可以使用聚合 (reduceByKey
) 来解决问题,该聚合将填充空白并在内存中保留列表的第一个元素的列表。然后我们可以根据该内存重新展平以创建与以前相同数量的行,但填充了值。
# let's define a function that selects the none empty values between two strings
def select_value(a, b):
if a is None or len(a) == 0:
return b
else:
return a
# let's use mapValues to separate the first element of the list and the rest
# Then we use reduceByKey to aggregate the list of all first elements (first
# element of the tuple). For the other elements, we only keep non empty values
# (second element of the tuple).
# Finally, we use flatMapValues to recreate the rows based on the memorized
# first elements of the lists.
paired_rdd\
.mapValues(lambda x: ([x[0]], x[1:]))\
.reduceByKey(lambda a, b: (
a[0] + b[0],
[select_value(a[1][i], b[1][i]) for i in range(len(a[1])) ]
) )\
.flatMapValues(lambda x: [[k] + x[1] for k in x[0]])\
.collect()
产生:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003'])
]
我正在使用 Spark RDD API 收集数据并创建了一个配对的 RDD,如下所示:
spark = SparkSession.builder.master('local').appName('app').getOrCreate()
sc = spark.sparkContext
raw_rdd = sc.textFile("data.csv")
paired_rdd = raw_rdd\
.map(lambda x: x.split(","))\
.map(lambda x: (x[2], [x[1], x[3],x[5]]))
这里是配对 RDD 的示例摘录:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', '', '']),
('VXIO456XLBB630221', ['R', '', '']),
('VXIO456XLBB630221', ['R', '', ''])]
如你所见,这个成对的 RDD 中的键对于所有元素都是相同的,但是只有一个元素完成了所有字段。
我们想要完成什么?我们想用具有完整字段的元素的值替换空字段。所以我们会有这样的预期输出:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003'])]
我知道第一步是 groupByKey
,即
paired_rdd.groupByKey().map(lambda kv: ____)
我只是不确定如何遍历这些值以及这如何适合一个 lambda 函数。
最好的方法可能是使用数据帧和 window 函数。使用 RDD,您也可以使用聚合 (reduceByKey
) 来解决问题,该聚合将填充空白并在内存中保留列表的第一个元素的列表。然后我们可以根据该内存重新展平以创建与以前相同数量的行,但填充了值。
# let's define a function that selects the none empty values between two strings
def select_value(a, b):
if a is None or len(a) == 0:
return b
else:
return a
# let's use mapValues to separate the first element of the list and the rest
# Then we use reduceByKey to aggregate the list of all first elements (first
# element of the tuple). For the other elements, we only keep non empty values
# (second element of the tuple).
# Finally, we use flatMapValues to recreate the rows based on the memorized
# first elements of the lists.
paired_rdd\
.mapValues(lambda x: ([x[0]], x[1:]))\
.reduceByKey(lambda a, b: (
a[0] + b[0],
[select_value(a[1][i], b[1][i]) for i in range(len(a[1])) ]
) )\
.flatMapValues(lambda x: [[k] + x[1] for k in x[0]])\
.collect()
产生:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003'])
]