用另一个替换 RDD 的值

Replacing the values of an RDD with another

我有如下两个数据集。每个数据集每行用“,”分隔数字。

Dataset 1

1,2,0,8,0

2,0,9,0,3

Dataset 2

7,5,4,6,3

4,9,2,1,8

我必须用数据集 2 中的相应值替换第一个数据集的零。

所以结果应该是这样的

1,2,4,8,3

2,9,9,1,3

我用下面的代码替换了这些值。

val rdd1 = sc.textFile(dataset1).flatMap(l => l.split(","))
val rdd2 = sc.textFile(dataset2).flatMap(l => l.split(","))
val result = rdd1.zip(rdd2).map( x => if(x._1 == "0") x._2 else x._1)

我得到的输出格式是RDD[String]。但我需要 RDD[Array[String]] 格式的输出,因为这种格式更适合我的进一步转换。

如果你想要一个RDD[Array[String]],其中数组的每个元素对应一行,拆分后不要平面映射值,只映射它们。

scala> val rdd1 = sc.parallelize(List("1,2,0,8,0", "2,0,9,0,3")).map(l => l.split(","))
rdd1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[1] at map at <console>:27

scala> val rdd2 = sc.parallelize(List("7,5,4,6,3", "4,9,2,1,8")).map(l => l.split(","))
rdd2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at <console>:27

scala> val result = rdd1.zip(rdd2).map{case(arr1, arr2) => arr1.zip(arr2).map{case(v1, v2) => if(v1 == "0") v2 else v1}}
result: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:31

scala> result.collect
res0: Array[Array[String]] = Array(Array(1, 2, 4, 8, 3), Array(2, 9, 9, 1, 3))

或者可能不那么冗长:

val result = rdd1.zip(rdd2).map(t => t._1.zip(t._2).map(x => if(x._1 == "0") x._2 else x._1))