如何拆分 Spark rdd Array[(String, Array[String])]?
How do I split a Spark rdd Array[(String, Array[String])]?
我正在练习如何在 Spark 中进行排序 shell。我有一个大约有 10 columns/variables 的 rdd。我想根据第 7 列的值对整个 rdd 进行排序。
rdd
org.apache.spark.rdd.RDD[Array[String]] = ...
据我所知,这样做的方法是使用 sortByKey,而这又只适用于成对。所以我映射了它,所以我有一对由 column7 (字符串值)和完整的原始 rdd (字符串数组)组成
rdd2 = rdd.map(c => (c(7),c))
rdd2: org.apache.spark.rdd.RDD[(String, Array[String])] = ...
然后我应用sortByKey,还是没问题...
rdd3 = rdd2.sortByKey()
rdd3: org.apache.spark.rdd.RDD[(String, Array[String])] = ...
但是现在我如何从 rdd3 (Array[String]) 中分离、收集和保存排序的原始 rdd?每当我尝试在 rdd3 上进行拆分时,它都会给我一个错误:
val rdd4 = rdd3.map(_.split(',')(2))
<console>:33: error: value split is not a member of (String, Array[String])
我在这里做错了什么?是否有其他更好的方法对 rdd 的其中一列进行排序?
我还以为你不熟悉 Scala,
所以,下面应该可以帮助你了解更多,
rdd3.map(kv => {
println(kv._1) // This represent String
println(kv._2) // This represent Array[String]
})
就这样做:
val rdd4 = rdd3.map(_._2)
您对 rdd2 = rdd.map(c => (c(7),c))
所做的是将其映射到一个元组。
rdd2: org.apache.spark.rdd.RDD[(String, Array[String])]
正如它所说的那样:)。
现在,如果你想拆分记录,你需要从这个元组中获取它。
您可以再次映射,只取元组的第二部分(即 Array[String]... 的数组),如下所示:rdd3.map(_._2)
但我强烈建议使用 try rdd.sortBy(_(7))
或类似的东西。这样你就不需要为元组之类的事情而烦恼了。
如果你想使用数组中的第7个字符串对rdd进行排序,你可以直接这样做
rdd.sortBy(_(6)) // array starts at 0 not 1
或
rdd.sortBy(arr => arr(6))
这将为您省去进行多次转换的所有麻烦。 rdd.sortBy(_._7)
或 rdd.sortBy(x => x._7)
不起作用的原因是因为这不是您访问数组内元素的方式。要访问数组的第 7 个元素,比如 arr
,您应该执行 arr(6)
.
为了对此进行测试,我执行了以下操作:
val rdd = sc.parallelize(Array(Array("ard", "bas", "wer"), Array("csg", "dip", "hwd"), Array("asg", "qtw", "hasd")))
// I want to sort it using the 3rd String
val sorted_rdd = rdd.sortBy(_(2))
结果如下:
Array(Array("ard", "bas", "wer"), Array("csg", "dip", "hwd"), Array("asg", "qtw", "hasd"))
我正在练习如何在 Spark 中进行排序 shell。我有一个大约有 10 columns/variables 的 rdd。我想根据第 7 列的值对整个 rdd 进行排序。
rdd
org.apache.spark.rdd.RDD[Array[String]] = ...
据我所知,这样做的方法是使用 sortByKey,而这又只适用于成对。所以我映射了它,所以我有一对由 column7 (字符串值)和完整的原始 rdd (字符串数组)组成
rdd2 = rdd.map(c => (c(7),c))
rdd2: org.apache.spark.rdd.RDD[(String, Array[String])] = ...
然后我应用sortByKey,还是没问题...
rdd3 = rdd2.sortByKey()
rdd3: org.apache.spark.rdd.RDD[(String, Array[String])] = ...
但是现在我如何从 rdd3 (Array[String]) 中分离、收集和保存排序的原始 rdd?每当我尝试在 rdd3 上进行拆分时,它都会给我一个错误:
val rdd4 = rdd3.map(_.split(',')(2))
<console>:33: error: value split is not a member of (String, Array[String])
我在这里做错了什么?是否有其他更好的方法对 rdd 的其中一列进行排序?
我还以为你不熟悉 Scala, 所以,下面应该可以帮助你了解更多,
rdd3.map(kv => {
println(kv._1) // This represent String
println(kv._2) // This represent Array[String]
})
就这样做:
val rdd4 = rdd3.map(_._2)
您对 rdd2 = rdd.map(c => (c(7),c))
所做的是将其映射到一个元组。
rdd2: org.apache.spark.rdd.RDD[(String, Array[String])]
正如它所说的那样:)。
现在,如果你想拆分记录,你需要从这个元组中获取它。
您可以再次映射,只取元组的第二部分(即 Array[String]... 的数组),如下所示:rdd3.map(_._2)
但我强烈建议使用 try rdd.sortBy(_(7))
或类似的东西。这样你就不需要为元组之类的事情而烦恼了。
如果你想使用数组中的第7个字符串对rdd进行排序,你可以直接这样做
rdd.sortBy(_(6)) // array starts at 0 not 1
或
rdd.sortBy(arr => arr(6))
这将为您省去进行多次转换的所有麻烦。 rdd.sortBy(_._7)
或 rdd.sortBy(x => x._7)
不起作用的原因是因为这不是您访问数组内元素的方式。要访问数组的第 7 个元素,比如 arr
,您应该执行 arr(6)
.
为了对此进行测试,我执行了以下操作:
val rdd = sc.parallelize(Array(Array("ard", "bas", "wer"), Array("csg", "dip", "hwd"), Array("asg", "qtw", "hasd")))
// I want to sort it using the 3rd String
val sorted_rdd = rdd.sortBy(_(2))
结果如下:
Array(Array("ard", "bas", "wer"), Array("csg", "dip", "hwd"), Array("asg", "qtw", "hasd"))