仅当一个字段 rdd 存在于 rdd 的第二个字段中时,如何 select 值

How to select values from one field rdd only if it is present in second field of rdd

我有一个包含 3 个字段的 rdd,如下所述。

1,2,6
2,4,6
1,4,9
3,4,7
2,3,8

现在,从上面的rdd,我想得到下面的rdd。

2,4,6
3,4,7
2,3,8

结果rdd没有以1开头的行,因为1在输入rdd的第二个字段中不存在。

好的,如果我理解正确的话,有两种方法:

  1. 将您的 RDD 分成两部分,其中第一个 RDD 包含 "second field" 的唯一值,第二个 RDD 具有 "first value" 作为键。然后将rdds连接在一起。这种方法的缺点是 distinctjoin 是缓慢的操作。

    val r: RDD[(String, String, Int)] = sc.parallelize(Seq(
      ("1", "2", 6),
      ("2", "4", 6),
      ("1", "4", 9),
      ("3", "4", 7),
      ("2", "3", 8)
    ))
    
    val uniqueValues: RDD[(String, Unit)] = r.map(x => x._2 -> ()).distinct
    val r1: RDD[(String, (String, String, Int))] = r.map(x => x._1 -> x)
    
    val result: RDD[(String, String, Int)] = r1.join(uniqueValues).map {case (_, (x, _)) => x}
    
    result.collect.foreach(println)
    
  2. 如果您的 RDD 相对较小并且 Set 个第二个值可以完全放入所有节点的内存中,那么您可以创建该内存中集作为第一步,广播它到所有节点,然后只过滤你的 RDD:

    val r: RDD[(String, String, Int)] = sc.parallelize(Seq(
      ("1", "2", 6),
      ("2", "4", 6),
      ("1", "4", 9),
      ("3", "4", 7),
      ("2", "3", 8)
    ))
    
    val uniqueValues = sc.broadcast(r.map(x => x._2).distinct.collect.toSet)
    
    val result: RDD[(String, String, Int)] = r.filter(x => uniqueValues.value.contains(x._1))
    
    result.collect.foreach(println)
    

两个示例输出:

(2,4,6)
(2,3,8)
(3,4,7)