比较两个 RDD
Comparing two RDDs
我有两个 RDD[Array[String]],我们称它们为 rdd1 和 rdd2。
我会创建一个新的 RDD,只包含 rdd2 的条目,而不是 rdd1 中的条目(基于密钥)。
我通过 Intellij 在 Scala 上使用 Spark。
我按键将 rdd1 和 rdd2 分组(我将只比较两个 rdd 的键):
val rdd1Grouped = rdd1.groupBy(line => line(0))
val rdd2Grouped = rdd2.groupBy(line => line(0))
然后,我用了一个leftOuterJoin
:
val output = rdd1Grouped.leftOuterJoin(rdd2Grouped).collect {
case (k, (v, None)) => (k, v)
}
但这似乎没有给出正确的结果。
有什么问题吗?有什么建议吗?
RDDS 示例(每一行都是一个 Array[String],ofc):
rdd1 rdd2 output (in some form)
1,18/6/2016 2,9/6/2016 2,9/6/2016
1,18/6/2016 2,9/6/2016
1,18/6/2016 2,9/6/2016
1,18/6/2016 2,9/6/2016
1,18/6/2016 1,20/6/2016
3,18/6/2016 1,20/6/2016
3,18/6/2016 1,20/6/2016
3,18/6/2016
3,18/6/2016
3,18/6/2016
在这种情况下,我只想添加条目“2,9/6/2016”,因为键“2”不在 rdd1 中。
new RDD containing just the entries of rdd2 not in rdd1
left join 将保留 rdd1 中的所有键并附加 RDD2 匹配键值的列。所以很明显 left join/outer join 不是解决办法。
rdd1Grouped.subtractByKey(rdd2Grouped)
适合您的情况。
P.S。 :另请注意,如果 rdd1 较小,则更好地广播它。这样,在减法时只有第二个 rdd 会被流式传输。
切换rdd1Grouped
和rdd2Grouped
,然后使用filter
:
val output = rdd2Grouped.leftOuterJoin(rdd1Grouped).filter( line => {
line._2._2.isEmpty
}).collect
我有两个 RDD[Array[String]],我们称它们为 rdd1 和 rdd2。 我会创建一个新的 RDD,只包含 rdd2 的条目,而不是 rdd1 中的条目(基于密钥)。 我通过 Intellij 在 Scala 上使用 Spark。
我按键将 rdd1 和 rdd2 分组(我将只比较两个 rdd 的键):
val rdd1Grouped = rdd1.groupBy(line => line(0))
val rdd2Grouped = rdd2.groupBy(line => line(0))
然后,我用了一个leftOuterJoin
:
val output = rdd1Grouped.leftOuterJoin(rdd2Grouped).collect {
case (k, (v, None)) => (k, v)
}
但这似乎没有给出正确的结果。
有什么问题吗?有什么建议吗?
RDDS 示例(每一行都是一个 Array[String],ofc):
rdd1 rdd2 output (in some form)
1,18/6/2016 2,9/6/2016 2,9/6/2016
1,18/6/2016 2,9/6/2016
1,18/6/2016 2,9/6/2016
1,18/6/2016 2,9/6/2016
1,18/6/2016 1,20/6/2016
3,18/6/2016 1,20/6/2016
3,18/6/2016 1,20/6/2016
3,18/6/2016
3,18/6/2016
3,18/6/2016
在这种情况下,我只想添加条目“2,9/6/2016”,因为键“2”不在 rdd1 中。
new RDD containing just the entries of rdd2 not in rdd1
left join 将保留 rdd1 中的所有键并附加 RDD2 匹配键值的列。所以很明显 left join/outer join 不是解决办法。
rdd1Grouped.subtractByKey(rdd2Grouped)
适合您的情况。
P.S。 :另请注意,如果 rdd1 较小,则更好地广播它。这样,在减法时只有第二个 rdd 会被流式传输。
切换rdd1Grouped
和rdd2Grouped
,然后使用filter
:
val output = rdd2Grouped.leftOuterJoin(rdd1Grouped).filter( line => {
line._2._2.isEmpty
}).collect