在 Spark Scala 中将具有等级的列添加到 rdd
Add a column with a rank to an rdd in Spark Scala
不幸的是,我们仍然必须使用 spark 1.0.0 并且需要使用 RDD。
我有一个从 CSV 文件创建的 RDD。
val serialRDD = sc.textFile(path)
如果我们打印 RDD 的每一行,我们会得到这样的结果(一个 id 和一个字符串):
1929 abc
2384 def
8753 ghi
3893 jkl
我希望能够添加另一列作为另一个 id,它将是一个类似于 "SERIAL-" 的字符串,其中 RANK 将是 1,2,3 等自动递增 1
输出应该是这样的:
1929 abc SERIAL-1
2384 def SERIAL-2
8753 ghi SERIAL-3
3893 jkl SERIAL-4
如何使用 RDD 完成此操作?
您可以使用 zipWithIndex
和 map
来完成:
serialRDD.zipWithIndex.map{ case (r, i) => (r._1, r._2, s"SERIAL-${i+1}") }
我使用字符串插值来获取 SERIAL-X
字符串。我还增加了索引,因为 zipWithIndex
从索引 0 开始。
不幸的是,我们仍然必须使用 spark 1.0.0 并且需要使用 RDD。 我有一个从 CSV 文件创建的 RDD。
val serialRDD = sc.textFile(path)
如果我们打印 RDD 的每一行,我们会得到这样的结果(一个 id 和一个字符串):
1929 abc
2384 def
8753 ghi
3893 jkl
我希望能够添加另一列作为另一个 id,它将是一个类似于 "SERIAL-" 的字符串,其中 RANK 将是 1,2,3 等自动递增 1
输出应该是这样的:
1929 abc SERIAL-1
2384 def SERIAL-2
8753 ghi SERIAL-3
3893 jkl SERIAL-4
如何使用 RDD 完成此操作?
您可以使用 zipWithIndex
和 map
来完成:
serialRDD.zipWithIndex.map{ case (r, i) => (r._1, r._2, s"SERIAL-${i+1}") }
我使用字符串插值来获取 SERIAL-X
字符串。我还增加了索引,因为 zipWithIndex
从索引 0 开始。