RDD 按键删除元素
RDD Remove elements by key
我有 2 个 RDD,它们使用以下代码引入:
val fileA = sc.textFile("fileA.txt")
val fileB = sc.textFile("fileB.txt")
然后我按键映射和缩减它:
val countsB = fileB.flatMap(line => line.split("\n"))
.map(word => (word, 1))
.reduceByKey(_+_)
val countsA = fileA.flatMap(line => line.split("\n"))
.map(word => (word, 1))
.reduceByKey(_+_)
如果 countA 中存在键,我现在不想查找和删除 countB 中的所有键
我试过类似的方法:
countsB.keys.foreach(b => {
if(countsB.collect().exists(_ == b)){
countsB.collect().drop(countsB.collect().indexOf(b))
}
})
但似乎并不能一键删除它们。
您建议的代码存在 3 个问题:
您正在 collect
ing RDD,这意味着它们不再是 RDD,它们作为普通 Scala 集合被复制到驱动程序应用程序的内存中,因此您失去了 Spark 的并行性并有 OutOfMemory 的风险如果您的数据集很大,则会出错
在不可变的 Scala 集合(或 RDD
)上调用 drop
时,您不会更改原始集合,您会得到一个 new 删除了这些记录的集合,因此您不能指望原始集合会发生变化
您不能在传递给任何 RDD 高阶方法的函数中访问 RDD
(例如,在本例中为 foreach
)- 任何传递给这些方法的函数被序列化并发送给工作人员,并且 RDD
s(有意)不可序列化 - 将它们提取到驱动程序内存中,序列化并发送回工作人员是没有意义的 - 数据已经分发给工作人员!
为了解决所有这些问题——当你想将一个 RDD 的数据用于 transform/filter 另一个 RDD 时,你通常需要使用某种类型的 join
.在这种情况下你可以这样做:
// left join, and keep only records for which there was NO match in countsA:
countsB.leftOuterJoin(countsA).collect { case (key, (valueB, None)) => (key, valueB) }
请注意,我在这里使用的 collect
不是您使用的 collect
- 这个以 PartialFunction
作为参数,其行为类似于map
和 filter
,最重要的是:它不会将所有数据复制到驱动程序内存中。
编辑:正如 The Archetypal Paul 评论的那样 - 你有一个更短更好的选择 - subtractByKey
:
countsB.subtractByKey(countsA)
我有 2 个 RDD,它们使用以下代码引入:
val fileA = sc.textFile("fileA.txt")
val fileB = sc.textFile("fileB.txt")
然后我按键映射和缩减它:
val countsB = fileB.flatMap(line => line.split("\n"))
.map(word => (word, 1))
.reduceByKey(_+_)
val countsA = fileA.flatMap(line => line.split("\n"))
.map(word => (word, 1))
.reduceByKey(_+_)
如果 countA 中存在键,我现在不想查找和删除 countB 中的所有键
我试过类似的方法:
countsB.keys.foreach(b => {
if(countsB.collect().exists(_ == b)){
countsB.collect().drop(countsB.collect().indexOf(b))
}
})
但似乎并不能一键删除它们。
您建议的代码存在 3 个问题:
您正在
collect
ing RDD,这意味着它们不再是 RDD,它们作为普通 Scala 集合被复制到驱动程序应用程序的内存中,因此您失去了 Spark 的并行性并有 OutOfMemory 的风险如果您的数据集很大,则会出错在不可变的 Scala 集合(或
RDD
)上调用drop
时,您不会更改原始集合,您会得到一个 new 删除了这些记录的集合,因此您不能指望原始集合会发生变化您不能在传递给任何 RDD 高阶方法的函数中访问
RDD
(例如,在本例中为foreach
)- 任何传递给这些方法的函数被序列化并发送给工作人员,并且RDD
s(有意)不可序列化 - 将它们提取到驱动程序内存中,序列化并发送回工作人员是没有意义的 - 数据已经分发给工作人员!
为了解决所有这些问题——当你想将一个 RDD 的数据用于 transform/filter 另一个 RDD 时,你通常需要使用某种类型的 join
.在这种情况下你可以这样做:
// left join, and keep only records for which there was NO match in countsA:
countsB.leftOuterJoin(countsA).collect { case (key, (valueB, None)) => (key, valueB) }
请注意,我在这里使用的 collect
不是您使用的 collect
- 这个以 PartialFunction
作为参数,其行为类似于map
和 filter
,最重要的是:它不会将所有数据复制到驱动程序内存中。
编辑:正如 The Archetypal Paul 评论的那样 - 你有一个更短更好的选择 - subtractByKey
:
countsB.subtractByKey(countsA)