Spark:有条件地 Joining/Concatting 基于列的前导字符
Spark: Conditionally Joining/Concatting Columns Based Leading Characters
我有一个数据集,其中包含未正确拆分的不干净数据。这导致列数不均匀 - 每行的列数取决于一个字段产生的错误数。如果列有 3 个前导双引号,您就知道它是否不正确。如果该列有 3 个前导双引号,那么您想将它与前一列连接起来并向左移动。
我将数据的 csv 导入数据框,创建类似于下面示例的内容。
示例:
输入:
`+--+--------+----------+----------+---------+
|id | detail | context | _c3 | _c4|
+---+--------+----------+----------+---------+
| 1 | {blah} | service | null | null |
| 2 | { blah | """ blah | """blah} | service |
| 3 | { blah | """blah} | service | null |
+---+--------+----------+----------+---------+`
期望的输出:
`+--+------------------------+---------+
|id | detail | context |
+---+------------------------+---------+
| 1 | {blah} | service |
| 2 | { blah""" blah"""blah} | service |
| 3 | { blah"""blah} | service |
+---+------------------------+---------+`
我尝试过类似的方法 - 以及许多其他方法:
`df.filter(col("context").startsWith("\"\"\"")).select($"detail", lit(" "), $"context").collect()`
这不起作用,并且没有完全完成我需要它做的事情。有任何想法吗?非常感谢帮助:)
谢谢!
我认为解决此问题的最简单方法是将列重新组合在一起,然后正确解析它们。一种方法是使用 concat 组合所有列,然后使用 regexp_extract 将您想要的部分作为单独的列拉出。例如:
case class MyRow(id: Int, detail: String, context: String, _c3: String, _c4: String)
val data = Seq(
MyRow(1, "{blah}", "service", "", ""),
MyRow(2, "{ blah", " \"\"\" blah", " \"\"\"blah}", "service"),
MyRow(3, "{ blah", "\"\"\"blah}", "service", "")
)
val df = sc.parallelize(data).toDF
val columns = df.columns.filterNot(_ == "id")
val nonulls = df.na.fill("")
val combined = nonulls.select($"id", concat(columns.map(col):_*) as "data")
val fixed = combined.withColumn("left", regexp_extract($"data", "(\{.*\})", 1)).
withColumn("right", regexp_extract($"data", "([^}]+$)", 1))
fixed.show(10, false)
应该输出:
+---+-------------------------------+------------------------+-------+
|id |data |left |right |
+---+-------------------------------+------------------------+-------+
|1 |{blah}service |{blah} |service|
|2 |{ blah """ blah """blah}service|{ blah """ blah """blah}|service|
|3 |{ blah"""blah}service |{ blah"""blah} |service|
+---+-------------------------------+------------------------+-------+
在上面的代码中,我假设列的顺序已经正确。
这只是在最后一个 } 上拆分。如果您需要更复杂的解析,您可以编写一个 UDF 来根据需要解析它和 returns 一个字段元组。
我有一个数据集,其中包含未正确拆分的不干净数据。这导致列数不均匀 - 每行的列数取决于一个字段产生的错误数。如果列有 3 个前导双引号,您就知道它是否不正确。如果该列有 3 个前导双引号,那么您想将它与前一列连接起来并向左移动。
我将数据的 csv 导入数据框,创建类似于下面示例的内容。
示例:
输入:
`+--+--------+----------+----------+---------+
|id | detail | context | _c3 | _c4|
+---+--------+----------+----------+---------+
| 1 | {blah} | service | null | null |
| 2 | { blah | """ blah | """blah} | service |
| 3 | { blah | """blah} | service | null |
+---+--------+----------+----------+---------+`
期望的输出:
`+--+------------------------+---------+
|id | detail | context |
+---+------------------------+---------+
| 1 | {blah} | service |
| 2 | { blah""" blah"""blah} | service |
| 3 | { blah"""blah} | service |
+---+------------------------+---------+`
我尝试过类似的方法 - 以及许多其他方法:
`df.filter(col("context").startsWith("\"\"\"")).select($"detail", lit(" "), $"context").collect()`
这不起作用,并且没有完全完成我需要它做的事情。有任何想法吗?非常感谢帮助:)
谢谢!
我认为解决此问题的最简单方法是将列重新组合在一起,然后正确解析它们。一种方法是使用 concat 组合所有列,然后使用 regexp_extract 将您想要的部分作为单独的列拉出。例如:
case class MyRow(id: Int, detail: String, context: String, _c3: String, _c4: String)
val data = Seq(
MyRow(1, "{blah}", "service", "", ""),
MyRow(2, "{ blah", " \"\"\" blah", " \"\"\"blah}", "service"),
MyRow(3, "{ blah", "\"\"\"blah}", "service", "")
)
val df = sc.parallelize(data).toDF
val columns = df.columns.filterNot(_ == "id")
val nonulls = df.na.fill("")
val combined = nonulls.select($"id", concat(columns.map(col):_*) as "data")
val fixed = combined.withColumn("left", regexp_extract($"data", "(\{.*\})", 1)).
withColumn("right", regexp_extract($"data", "([^}]+$)", 1))
fixed.show(10, false)
应该输出:
+---+-------------------------------+------------------------+-------+
|id |data |left |right |
+---+-------------------------------+------------------------+-------+
|1 |{blah}service |{blah} |service|
|2 |{ blah """ blah """blah}service|{ blah """ blah """blah}|service|
|3 |{ blah"""blah}service |{ blah"""blah} |service|
+---+-------------------------------+------------------------+-------+
在上面的代码中,我假设列的顺序已经正确。
这只是在最后一个 } 上拆分。如果您需要更复杂的解析,您可以编写一个 UDF 来根据需要解析它和 returns 一个字段元组。