遍历配对 RDD (Pyspark) 的值并替换空值

Iterating through values of a paired RDD (Pyspark) and replacing null values

我正在使用 Spark RDD API 收集数据并创建了一个配对的 RDD,如下所示:

spark = SparkSession.builder.master('local').appName('app').getOrCreate()
sc = spark.sparkContext
raw_rdd = sc.textFile("data.csv")

paired_rdd = raw_rdd\
    .map(lambda x: x.split(","))\
    .map(lambda x: (x[2], [x[1], x[3],x[5]]))

这里是配对 RDD 的示例摘录:

[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
 ('VXIO456XLBB630221', ['A', '', '']),
 ('VXIO456XLBB630221', ['R', '', '']),
 ('VXIO456XLBB630221', ['R', '', ''])]

如你所见,这个成对的 RDD 中的键对于所有元素都是相同的,但是只有一个元素完成了所有字段。

我们想要完成什么?我们想用具有完整字段的元素的值替换空字段。所以我们会有这样的预期输出:

[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
 ('VXIO456XLBB630221', ['A', 'Nissan', '2003']), 
 ('VXIO456XLBB630221', ['R', 'Nissan', '2003']), 
 ('VXIO456XLBB630221', ['R', 'Nissan', '2003'])]

我知道第一步是 groupByKey,即

paired_rdd.groupByKey().map(lambda kv: ____)

我只是不确定如何遍历这些值以及这如何适合一个 lambda 函数。

最好的方法可能是使用数据帧和 window 函数。使用 RDD,您也可以使用聚合 (reduceByKey) 来解决问题,该聚合将填充空白并在内存中保留列表的第一个元素的列表。然后我们可以根据该内存重新展平以创建与以前相同数量的行,但填充了值。

# let's define a function that selects the none empty values between two strings
def select_value(a, b):
    if a is None or len(a) == 0:
         return b
    else:
         return a

# let's use mapValues to separate the first element of the list and the rest
# Then we use reduceByKey to aggregate the list of all first elements (first
# element of the tuple). For the other elements, we only keep non empty values
# (second element of the tuple).
# Finally, we use flatMapValues to recreate the rows based on the memorized
# first elements of the lists.
paired_rdd\
    .mapValues(lambda x: ([x[0]], x[1:]))\
    .reduceByKey(lambda a, b: (
            a[0] + b[0],
            [select_value(a[1][i], b[1][i]) for i in range(len(a[1])) ] 
     ) )\
    .flatMapValues(lambda x: [[k] + x[1]  for k in x[0]])\
    .collect()

产生:

[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
 ('VXIO456XLBB630221', ['A', 'Nissan', '2003']),
 ('VXIO456XLBB630221', ['R', 'Nissan', '2003']),
 ('VXIO456XLBB630221', ['R', 'Nissan', '2003'])
]