Spark 1.5.1、Scala 2.10.5:如何扩展 RDD[Array[String]、Vector]

Spark 1.5.1, Scala 2.10.5: how to expand an RDD[Array[String], Vector]

我正在使用 Spark 1.5.1 和 Scala 2.10.5

我对 RDD 的每个元素都有一个 RDD[Array[String], Vector]

目标是通过构建元组的 RDD 来结束:RDD[(String, Vector)],这个 RDD 包含在上一步中创建的所有元组。

谢谢

你试过这个吗?

// rdd: RDD[Array[String], Vector] - initial RDD
val new_rdd = rdd
  .flatMap {
    case (array: Array[String], vec: Vector) => array.map(str => (str, vec))
  }

玩具示例(我是 运行 它在 spark-shell 中):

val rdd = sc.parallelize(Array((Array("foo", "bar"), 100), (Array("one", "two"), 200)))
val new_rdd = rdd
  .map {
    case (array: Array[String], vec: Int) => array.map(str => (str, vec))
  }
  .flatMap(arr => arr)
new_rdd.collect
res14: Array[(String, Int)] = Array((foo,100), (bar,100), (one,200), (two,200))

考虑一下:

rdd.flatMap { case (arr, vec) => arr.map( (s) => (s, vec) ) }

(第一个 flatMap 让你得到一个 RDD[(String, Vector)] 作为输出,而不是一个 map ,它会给你一个 RDD[Array[(String, Vector)]]