如何拆分 Spark rdd Array[(String, Array[String])]?

How do I split a Spark rdd Array[(String, Array[String])]?

我正在练习如何在 Spark 中进行排序 shell。我有一个大约有 10 columns/variables 的 rdd。我想根据第 7 列的值对整个 rdd 进行排序。

rdd
org.apache.spark.rdd.RDD[Array[String]] = ...

据我所知,这样做的方法是使用 sortByKey,而这又只适用于成对。所以我映射了它,所以我有一对由 column7 (字符串值)和完整的原始 rdd (字符串数组)组成

rdd2 = rdd.map(c => (c(7),c))
rdd2: org.apache.spark.rdd.RDD[(String, Array[String])] = ...

然后我应用sortByKey,还是没问题...

rdd3 = rdd2.sortByKey()
rdd3: org.apache.spark.rdd.RDD[(String, Array[String])] = ...

但是现在我如何从 rdd3 (Array[String]) 中分离、收集和保存排序的原始 rdd?每当我尝试在 rdd3 上进行拆分时,它都会给我一个错误:

val rdd4 = rdd3.map(_.split(',')(2))
<console>:33: error: value split is not a member of (String, Array[String])

我在这里做错了什么?是否有其他更好的方法对 rdd 的其中一列进行排序?

我还以为你不熟悉 Scala, 所以,下面应该可以帮助你了解更多,

rdd3.map(kv => {
  println(kv._1) // This represent String 
  println(kv._2) // This represent Array[String]
})

就这样做:

val rdd4 = rdd3.map(_._2)

您对 rdd2 = rdd.map(c => (c(7),c)) 所做的是将其映射到一个元组。 rdd2: org.apache.spark.rdd.RDD[(String, Array[String])] 正如它所说的那样:)。 现在,如果你想拆分记录,你需要从这个元组中获取它。 您可以再次映射,只取元组的第二部分(即 Array[String]... 的数组),如下所示:rdd3.map(_._2)

但我强烈建议使用 try rdd.sortBy(_(7)) 或类似的东西。这样你就不需要为元组之类的事情而烦恼了。

如果你想使用数组中的第7个字符串对rdd进行排序,你可以直接这样做

rdd.sortBy(_(6)) // array starts at 0 not 1

rdd.sortBy(arr => arr(6))

这将为您省去进行多次转换的所有麻烦。 rdd.sortBy(_._7)rdd.sortBy(x => x._7) 不起作用的原因是因为这不是您访问数组内元素的方式。要访问数组的第 7 个元素,比如 arr,您应该执行 arr(6).

为了对此进行测试,我执行了以下操作:

val rdd = sc.parallelize(Array(Array("ard", "bas", "wer"), Array("csg", "dip", "hwd"), Array("asg", "qtw", "hasd")))

// I want to sort it using the 3rd String
val sorted_rdd = rdd.sortBy(_(2))

结果如下:

Array(Array("ard", "bas", "wer"), Array("csg", "dip", "hwd"), Array("asg", "qtw", "hasd"))