替换 RDD 的一些元素

Replace some elements of an RDD

我有这个文件,其中包含三种类型的数据,比如 A、B、C。我只想根据某些条件用其他 RDD(称为 RDD2)的元素替换此 RDD(称为 RDD1)的 A 型元素。RDD1 和 RDD2 有一些共同的字符串。

文件结构 [RDD[String]]

1 A 2   
2 B 12 13 4
2 C 67 29  
2 A 5

RDD2 [RDD[行]]

1 A 2 5 6
2 A 5 7 8 

我正在过滤第一个 RDD 并向其附加一些字符串以创建 RDD2。

我知道 RDD 是不可变的,但由于 map 函数不接受 arguments.I 我想知道是否有办法实现这一点。

编辑:考虑以下评论

示例输出 RDD[任意]

1 A 2 5 6
2 B 12 13 4
2 C 67 29
2 A 5 7 8

对我有用

val rdd1 = sc.parallelize(Seq(List("1", "A", "2"), List("2", "B", "12", "13", "4"), List("2", "C", "67", "29"), List("2", "A", "5")))
val rdd2 = sc.parallelize(Seq(List("1", "A", "2", "5", "6"), List("2", "A", "5", "7", "8")))
rdd1.map(row =>//where row(0), row(1) is your condition
  ((row(0), row(1)), row)).leftOuterJoin(rdd2.map(row =>
  ((row(0), row(1)), row))).map(row => {
  row._2._2.getOrElse(row._2._1)
}).foreach(println)

您应该考虑在 RDD 中使用更合适和明确的数据结构,例如(键,值)对的 RDD。

然后您可以利用该键在您的 RDD1 和 RDD2 之间执行连接 "a la SQL"。我相信这就是 Gabber 上面已经在做的事情,但是使用了 Scala 语法糖的全部功能。

以更明确的方式: 您的初始 RDD,如 Gabber:

val rdd1 = sc.parallelize(Seq(List("1", "A", "2"), List("2", "B", "12", "13", "4"), List("2", "C", "67", "29"), List("2", "A", "5")))
val rdd2 = sc.parallelize(Seq(List("1", "A", "2", "5", "6"), List("2", "A", "5", "7", "8")))

然后用映射创建一个(键,值)对的RDD,其中键将用于满足您的匹配条件(在您的示例中,您的键似乎是前两个元素ex。(1,A ))

val rdd1KeyValue = rdd1.map(row => ((row(0),row(1)), row)
val rdd2KeyValue = rdd2.map(row => ((row(0),row(1)), row))

现在,由于您要对具有键 "A" 的值执行 "join" 并保留其他不匹配的值,因此这是一个 SQL 左外连接。所以:

val resultRaw = rdd1KeyValue.leftOuterJoin(rdd2KeyValue)

但 resultRaw 现在是这样的:

((2,C),(List(2, C, 67, 29),None))
((1,A),(List(1, A, 2),Some(List(1, A, 2, 5, 6))))
((2,B),(List(2, B, 12, 13, 4),None))
((2,A),(List(2, A, 5),Some(List(2, A, 5, 7, 8))))

因此,要获取最终结果,您需要再次映射到您需要的 "pick"(._1 运算符是获取(键,值)对的第一个值):

val resFinal = result.map(row => row._2._2.getOrElse(row._2._1))

以我为例,最终结果是:

List(1, A, 2, 5, 6)
List(2, B, 12, 13, 4)
List(2, A, 5, 7, 8)
List(2, C, 67, 29)