RDD 按键删除元素

RDD Remove elements by key

我有 2 个 RDD,它们使用以下代码引入:

val fileA = sc.textFile("fileA.txt")
val fileB = sc.textFile("fileB.txt")

然后我按键映射和缩减它:

val countsB = fileB.flatMap(line => line.split("\n"))
  .map(word => (word, 1))
  .reduceByKey(_+_)

val countsA = fileA.flatMap(line => line.split("\n"))
  .map(word => (word, 1))
  .reduceByKey(_+_)

如果 countA 中存在键,我现在不想查找和删除 countB 中的所有键

我试过类似的方法:

countsB.keys.foreach(b => {
  if(countsB.collect().exists(_ == b)){
    countsB.collect().drop(countsB.collect().indexOf(b))
  }
})

但似乎并不能一键删除它们。

您建议的代码存在 3 个问题:

  1. 您正在 collecting RDD,这意味着它们不再是 RDD,它们作为普通 Scala 集合被复制到驱动程序应用程序的内存中,因此您失去了 Spark 的并行性并有 OutOfMemory 的风险如果您的数据集很大,则会出错

  2. 在不可变的 Scala 集合(或 RDD)上调用 drop 时,您不会更改原始集合,您会得到一个 new 删除了这些记录的集合,因此您不能指望原始集合会发生变化

  3. 您不能在传递给任何 RDD 高阶方法的函数中访问 RDD(例如,在本例中为 foreach)- 任何传递给这些方法的函数被序列化并发送给工作人员,并且 RDDs(有意)不可序列化 - 将它们提取到驱动程序内存中,序列化并发送回工作人员是没有意义的 - 数据已经分发给工作人员!

为了解决所有这些问题——当你想将一个 RDD 的数据用于 transform/filter 另一个 RDD 时,你通常需要使用某种类型的 join .在这种情况下你可以这样做:

// left join, and keep only records for which there was NO match in countsA:
countsB.leftOuterJoin(countsA).collect { case (key, (valueB, None)) => (key, valueB) }

请注意,我在这里使用的 collect 不是您使用的 collect - 这个以 PartialFunction 作为参数,其行为类似于mapfilter,最重要的是:它不会将所有数据复制到驱动程序内存中。

编辑:正如 The Archetypal Paul 评论的那样 - 你有一个更短更好的选择 - subtractByKey:

countsB.subtractByKey(countsA)