Spark RDD:设置差异
Spark RDD: set difference
val data: RDD [(String, Array[Int])] = sc.parallelize(Seq(
("100",Array(1, 2, 3, 4, 5)), ("1000",Array(10, 11, 12, 13, 14))
))
val codes = sc.parallelize(Seq(2, 3, 12, 13))
val result = data.map {case (id,values) => (id, values.diff(codes))}
我想得到如下结果:
val result: Array[(String, Array[Int])] = Array(
("100", Array(1, 4, 5)), ("1000", Array(10, 11, 14))
)
但是,当我进行设置差异时,出现类型不匹配错误。
如果您想在本地数据结构上应用操作,则没有理由并行化 codes
。就像这样 mapValues
:
val codes = Seq(2, 3, 12, 13)
val result = data.mapValues(_.diff(codes))
如果代码不适合内存,您将不得不做一些稍微复杂的事情:
// Add dummy values to codes
val codes = sc.parallelize(Seq(2, 3, 12, 13)).map((_, null))
data
.flatMapValues(x => x) // Flatten values (k, vs) => (k, v)
.map(_.swap) // Swap order => (v, k)
.subtractByKey(codes) // Difference
.map(_.swap) // Swap order => (k, v)
.groupByKey // Group => (k, vs)
val data: RDD [(String, Array[Int])] = sc.parallelize(Seq(
("100",Array(1, 2, 3, 4, 5)), ("1000",Array(10, 11, 12, 13, 14))
))
val codes = sc.parallelize(Seq(2, 3, 12, 13))
val result = data.map {case (id,values) => (id, values.diff(codes))}
我想得到如下结果:
val result: Array[(String, Array[Int])] = Array(
("100", Array(1, 4, 5)), ("1000", Array(10, 11, 14))
)
但是,当我进行设置差异时,出现类型不匹配错误。
如果您想在本地数据结构上应用操作,则没有理由并行化 codes
。就像这样 mapValues
:
val codes = Seq(2, 3, 12, 13)
val result = data.mapValues(_.diff(codes))
如果代码不适合内存,您将不得不做一些稍微复杂的事情:
// Add dummy values to codes
val codes = sc.parallelize(Seq(2, 3, 12, 13)).map((_, null))
data
.flatMapValues(x => x) // Flatten values (k, vs) => (k, v)
.map(_.swap) // Swap order => (v, k)
.subtractByKey(codes) // Difference
.map(_.swap) // Swap order => (k, v)
.groupByKey // Group => (k, vs)