比较两个 RDD

Comparing two RDDs

我有两个 RDD[Array[String]],我们称它们为 rdd1 和 rdd2。 我会创建一个新的 RDD,只包含 rdd2 的条目,而不是 rdd1 中的条目(基于密钥)。 我通过 Intellij 在 Scala 上使用 Spark。

我按键将 rdd1 和 rdd2 分组(我将只比较两个 rdd 的键):

val rdd1Grouped = rdd1.groupBy(line => line(0))
val rdd2Grouped = rdd2.groupBy(line => line(0))

然后,我用了一个leftOuterJoin

val output = rdd1Grouped.leftOuterJoin(rdd2Grouped).collect {
  case (k, (v, None)) => (k, v)
}

但这似乎没有给出正确的结果。

有什么问题吗?有什么建议吗?

RDDS 示例(每一行都是一个 Array[String],ofc):

rdd1                        rdd2                  output (in some form)

1,18/6/2016               2,9/6/2016                  2,9/6/2016
1,18/6/2016               2,9/6/2016 
1,18/6/2016               2,9/6/2016
1,18/6/2016               2,9/6/2016
1,18/6/2016               1,20/6/2016
3,18/6/2016               1,20/6/2016 
3,18/6/2016               1,20/6/2016
3,18/6/2016
3,18/6/2016
3,18/6/2016

在这种情况下,我只想添加条目“2,9/6/2016”,因为键“2”不在 rdd1 中。

new RDD containing just the entries of rdd2 not in rdd1

left join 将保留 rdd1 中的所有键并附加 RDD2 匹配键值的列。所以很明显 left join/outer join 不是解决办法。

rdd1Grouped.subtractByKey(rdd2Grouped) 适合您的情况。

P.S。 :另请注意,如果 rdd1 较小,则更好地广播它。这样,在减法时只有第二个 rdd 会被流式传输。

切换rdd1Groupedrdd2Grouped,然后使用filter:

val output = rdd2Grouped.leftOuterJoin(rdd1Grouped).filter( line => {
  line._2._2.isEmpty
}).collect