从另一个元组列表中过滤元组的 RDD

Filter RDD of tuples from another list of tuples

我目前正在使用 Spark 和 Scalacheck,我正在尝试过滤 RDD[(A,Long)] ( 其中 A 是从 Avro 文件读取的寄存器,Long 是从zipWithUniqueId() 函数 ) 来自存储在缓冲区中的相同 RDD 的样本。

我的目的是测试该样本的某些属性,一旦失败,在该 RDD 的样本中再次测试 属性,该 RDD 不包含之前采样的任何值。 我将 rdd 存储在一个 var 中,这样我就可以在过滤后重新分配它。 我的代码是这样的:

val samplingSeed = new Random(System.currentTimeMillis()).nextLong()
val sampled = rdd.takeSample(withReplacement = false, bufferSize, samplingSeed)
val buffer: JQueue[(A, Long)] = new JConcurrentLinkedQueue[(A, Long)]

//Sampled as Array converts to queue
for (i <- 0 to sampled.length - 1)
 buffer.add(sampled(i).asInstanceOf[(A, Long)])

//rdd is assigned to a var for persistence
//filter here and leave out all the tuples in buffer based in the 
//Long  value in each tuple
 rdd= rdd.filter{foo}

我怎样才能做到这一点?

一般情况下,可以使用广播变量按集合过滤:

val rdd = sc.parallelize((1 to 10).toSeq)
val ids = sc.broadcast(Set(1, 2, 3))
rdd.filter(v => !ids.value.contains(v)).collect()
res1: Array[Int] = Array(4, 5, 6, 7, 8, 9, 10)