Spark:有条件地 Joining/Concatting 基于列的前导字符

Spark: Conditionally Joining/Concatting Columns Based Leading Characters

我有一个数据集,其中包含未正确拆分的不干净数据。这导致列数不均匀 - 每行的列数取决于一个字段产生的错误数。如果列有 3 个前导双引号,您就知道它是否不正确。如果该列有 3 个前导双引号,那么您想将它与前一列连接起来并向左移动。

我将数据的 csv 导入数据框,创建类似于下面示例的内容。

示例:

输入:

`+--+--------+----------+----------+---------+
|id | detail | context  |      _c3 |      _c4|
+---+--------+----------+----------+---------+
| 1 | {blah} | service  | null     | null    |
| 2 | { blah | """ blah | """blah} | service |
| 3 | { blah | """blah} | service  | null    |
+---+--------+----------+----------+---------+`

期望的输出:

`+--+------------------------+---------+
|id | detail                 | context |
+---+------------------------+---------+
| 1 | {blah}                 | service |
| 2 | { blah""" blah"""blah} | service |
| 3 | { blah"""blah}         | service |
+---+------------------------+---------+`

我尝试过类似的方法 - 以及许多其他方法:

`df.filter(col("context").startsWith("\"\"\"")).select($"detail", lit(" "), $"context").collect()`

这不起作用,并且没有完全完成我需要它做的事情。有任何想法吗?非常感谢帮助:)

谢谢!

我认为解决此问题的最简单方法是将列重新组合在一起,然后正确解析它们。一种方法是使用 concat 组合所有列,然后使用 regexp_extract 将您想要的部分作为单独的列拉出。例如:

case class MyRow(id: Int, detail: String, context: String, _c3: String, _c4: String)
val data = Seq(
    MyRow(1, "{blah}", "service", "", ""),
    MyRow(2, "{ blah", " \"\"\" blah", " \"\"\"blah}", "service"),
    MyRow(3, "{ blah", "\"\"\"blah}", "service", "")
)

val df = sc.parallelize(data).toDF

val columns = df.columns.filterNot(_ == "id")

val nonulls = df.na.fill("")
val combined = nonulls.select($"id", concat(columns.map(col):_*) as "data")

val fixed = combined.withColumn("left", regexp_extract($"data", "(\{.*\})", 1)).
               withColumn("right", regexp_extract($"data", "([^}]+$)", 1))

fixed.show(10, false)

应该输出:

+---+-------------------------------+------------------------+-------+
|id |data                           |left                    |right  |
+---+-------------------------------+------------------------+-------+
|1  |{blah}service                  |{blah}                  |service|
|2  |{ blah """ blah """blah}service|{ blah """ blah """blah}|service|
|3  |{ blah"""blah}service          |{ blah"""blah}          |service|
+---+-------------------------------+------------------------+-------+

在上面的代码中,我假设列的顺序已经正确。

这只是在最后一个 } 上拆分。如果您需要更复杂的解析,您可以编写一个 UDF 来根据需要解析它和 returns 一个字段元组。