Spark RDD:设置差异

Spark RDD: set difference

val data: RDD [(String, Array[Int])] = sc.parallelize(Seq(
  ("100",Array(1, 2, 3, 4, 5)), ("1000",Array(10, 11, 12, 13, 14))
))

val codes = sc.parallelize(Seq(2, 3, 12, 13))

val result = data.map {case (id,values) => (id, values.diff(codes))}

我想得到如下结果:

val result: Array[(String, Array[Int])] = Array(
  ("100", Array(1, 4, 5)), ("1000", Array(10, 11, 14))
)

但是,当我进行设置差异时,出现类型不匹配错误。

如果您想在本地数据结构上应用操作,则没有理由并行化 codes。就像这样 mapValues

val codes = Seq(2, 3, 12, 13)
val result = data.mapValues(_.diff(codes))

如果代码不适合内存,您将不得不做一些稍微复杂的事情:

// Add dummy values to codes
val codes = sc.parallelize(Seq(2, 3, 12, 13)).map((_, null))

data
  .flatMapValues(x => x)  // Flatten values (k, vs) => (k, v)
  .map(_.swap) // Swap order => (v, k)
  .subtractByKey(codes) // Difference
  .map(_.swap) // Swap order => (k, v)
  .groupByKey  // Group => (k, vs)