Spark - Rdd 字符串 Cleaning/Manipulation

Spark - Rdd String Cleaning/Manipulation

我有一个 spark.rdd.RDD[String] MapPartition 是我用过滤器创建的。

val myMapPartition = myTextFile.filter(_.split("\t")(2) == "\"red\"")

此过滤器通过制表符分隔符拆分我的文本文件行,并检查结果数组的第二个元素是否等于 "red"

myMapPartition.collect() returns Array 类型 String。这是一个例子:

24344 "someString" "red"
23421 "someOtherString" "red"

我正在尝试对字符串进行一些编辑。最终,我查看了一些字符串替换逻辑,但我尝试先连接一个字符串。所以我会寻找这样的东西:

24344 "someString hello" "red"
23421 "someOtherString hello" "red"

我尝试使用 map:

来完成此操作
val myCleanRdd = myMapPartition.map(_1 => (_1.concat(" hello")))

然而,我得到了:

24344 "someString" "red" hello
23421 "someOtherString" "red" hello

我的问题是如何操作 rdd 行的某些元素?我认为问题在于我的行被认为是一个 String。我不确定如何正确映射它以让我专注于各个领域。

免责声明: Scala/Spark 菜鸟

您首先需要将 split 映射到原始 RDD 的每个元素上,因此您最终得到 RDD[Array[String]] 而不是 RDD[String],例如

myTextFile.map(_.split("\t")).filter(_(2) == "\"red\"")

目前您正在使用 split 来过滤字符串的输入 RDD,但这只会创建字符串的输出 RDD,从而丢弃您对 split 它们所做的工作。

然后,如果你的 RDD 的每个元素都是一个已知长度的 Array[String],那么你可以 map 使用模式匹配(使用 case 关键字)来提取和修改单个元素,例如:

rdd.map { case Array(x, y, z) => Array(x, y + " hello", z) }

(请注意,在使用此方法时,您必须在 map 函数周围使用大括号 {} 而不是圆括号 ())。可以对列表、元组、向量等行进行类似的模式匹配...

更新:如果你想用处理过的版本替换其中一个元素,这是类似的模式,例如

rdd.map { case Array(x, y, z) => Array(x, y.replace("s","x"), z) }

要打印出 RDD[Array[String]] 的所有元素,您可以执行嵌套 foreach,例如

rdd.foreach(_.foreach(println))

由于重载方法(通常使用 Arrays.toString 但在 Scala 中使用 seems to cause type problems ),将每一行作为数组打印出来比预期的要棘手,但可以按如下方式完成:

rdd.foreach(row => println(row.mkString("[",",","]")))