Spark - Rdd 字符串 Cleaning/Manipulation
Spark - Rdd String Cleaning/Manipulation
我有一个 spark.rdd.RDD[String] MapPartition
是我用过滤器创建的。
val myMapPartition = myTextFile.filter(_.split("\t")(2) == "\"red\"")
此过滤器通过制表符分隔符拆分我的文本文件行,并检查结果数组的第二个元素是否等于 "red"
myMapPartition.collect()
returns Array
类型 String
。这是一个例子:
24344 "someString" "red"
23421 "someOtherString" "red"
我正在尝试对字符串进行一些编辑。最终,我查看了一些字符串替换逻辑,但我尝试先连接一个字符串。所以我会寻找这样的东西:
24344 "someString hello" "red"
23421 "someOtherString hello" "red"
我尝试使用 map
:
来完成此操作
val myCleanRdd = myMapPartition.map(_1 => (_1.concat(" hello")))
然而,我得到了:
24344 "someString" "red" hello
23421 "someOtherString" "red" hello
我的问题是如何操作 rdd 行的某些元素?我认为问题在于我的行被认为是一个 String
。我不确定如何正确映射它以让我专注于各个领域。
免责声明: Scala/Spark 菜鸟
您首先需要将 split
映射到原始 RDD 的每个元素上,因此您最终得到 RDD[Array[String]]
而不是 RDD[String]
,例如
myTextFile.map(_.split("\t")).filter(_(2) == "\"red\"")
目前您正在使用 split
来过滤字符串的输入 RDD,但这只会创建字符串的输出 RDD,从而丢弃您对 split
它们所做的工作。
然后,如果你的 RDD 的每个元素都是一个已知长度的 Array[String]
,那么你可以 map
使用模式匹配(使用 case
关键字)来提取和修改单个元素,例如:
rdd.map { case Array(x, y, z) => Array(x, y + " hello", z) }
(请注意,在使用此方法时,您必须在 map
函数周围使用大括号 {}
而不是圆括号 ()
)。可以对列表、元组、向量等行进行类似的模式匹配...
更新:如果你想用处理过的版本替换其中一个元素,这是类似的模式,例如
rdd.map { case Array(x, y, z) => Array(x, y.replace("s","x"), z) }
要打印出 RDD[Array[String]]
的所有元素,您可以执行嵌套 foreach
,例如
rdd.foreach(_.foreach(println))
由于重载方法(通常使用 Arrays.toString
但在 Scala 中使用 seems to cause type problems ),将每一行作为数组打印出来比预期的要棘手,但可以按如下方式完成:
rdd.foreach(row => println(row.mkString("[",",","]")))
我有一个 spark.rdd.RDD[String] MapPartition
是我用过滤器创建的。
val myMapPartition = myTextFile.filter(_.split("\t")(2) == "\"red\"")
此过滤器通过制表符分隔符拆分我的文本文件行,并检查结果数组的第二个元素是否等于 "red"
myMapPartition.collect()
returns Array
类型 String
。这是一个例子:
24344 "someString" "red"
23421 "someOtherString" "red"
我正在尝试对字符串进行一些编辑。最终,我查看了一些字符串替换逻辑,但我尝试先连接一个字符串。所以我会寻找这样的东西:
24344 "someString hello" "red"
23421 "someOtherString hello" "red"
我尝试使用 map
:
val myCleanRdd = myMapPartition.map(_1 => (_1.concat(" hello")))
然而,我得到了:
24344 "someString" "red" hello
23421 "someOtherString" "red" hello
我的问题是如何操作 rdd 行的某些元素?我认为问题在于我的行被认为是一个 String
。我不确定如何正确映射它以让我专注于各个领域。
免责声明: Scala/Spark 菜鸟
您首先需要将 split
映射到原始 RDD 的每个元素上,因此您最终得到 RDD[Array[String]]
而不是 RDD[String]
,例如
myTextFile.map(_.split("\t")).filter(_(2) == "\"red\"")
目前您正在使用 split
来过滤字符串的输入 RDD,但这只会创建字符串的输出 RDD,从而丢弃您对 split
它们所做的工作。
然后,如果你的 RDD 的每个元素都是一个已知长度的 Array[String]
,那么你可以 map
使用模式匹配(使用 case
关键字)来提取和修改单个元素,例如:
rdd.map { case Array(x, y, z) => Array(x, y + " hello", z) }
(请注意,在使用此方法时,您必须在 map
函数周围使用大括号 {}
而不是圆括号 ()
)。可以对列表、元组、向量等行进行类似的模式匹配...
更新:如果你想用处理过的版本替换其中一个元素,这是类似的模式,例如
rdd.map { case Array(x, y, z) => Array(x, y.replace("s","x"), z) }
要打印出 RDD[Array[String]]
的所有元素,您可以执行嵌套 foreach
,例如
rdd.foreach(_.foreach(println))
由于重载方法(通常使用 Arrays.toString
但在 Scala 中使用 seems to cause type problems ),将每一行作为数组打印出来比预期的要棘手,但可以按如下方式完成:
rdd.foreach(row => println(row.mkString("[",",","]")))