将字符串列表转换为结构列表

Convert List of strings to list of structs

我有一个包含两列的数据框。第一个是一列唯一 ID,第二个是用冒号分隔的学生分数列表(这是在没有 headers 的情况下从 CSV 加载它之后)。

是否有任何机制可以将第二列转换为结构列表以供进一步处理?还是动态数量的附加列?我只需要一种方法来对每个 id 的分数进行额外处理,即计算 id 0000000003 的平均值,这在当前输出数据格式中无法完成。

+----------+-----------------------------+
|id        |scores                       |
+----------+-----------------------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|
|0783563078|chris,1.0                    |
|0783801254|michelle,1.0:vixon,2.3       |
+----------+-----------------------------+

进入

+----------+--------------------------------------------------------------------------+
|id        |scores                                                                    |
+----------+--------------------------------------------------------------------------+
|0000000003|[{student -> brian, score -> 1.0 } , {student -> steve, score -> 2.3 .... |
+----------+--------------------------------------------------------------------------+

或者可能是这样的:

+----------+--------+------+--------+------+------+
|id        |student1|score1|student2|score3|etc...|
+----------+--------+------+--------+------+------+
|0000000003|        |      |        |      |      |
+----------+--------+------+--------+------+------+

我只是不确定如何将这种数据格式转换为可处理的格式。

方法 4 可能是获得平均值的最短方法,但其他方法允许您将数据提取为 maps/structs。

方法一

一种易于访问的方法可能是使用 str_to_map which will convert your string value to a map. You could then use map_values 来提取分数,例如

(
    df.withColumn(
        "score_map",
        expr("str_to_map(scores,':',',')")
    ).withColumn(
        "score_values",
        map_values(F.expr("str_to_map(scores,':',',')"))
    )
).show(false)
+----------+-----------------------------+------------------------------------------+---------------+
|id        |scores                       |score_map                                 |score_values   |
+----------+-----------------------------+------------------------------------------+---------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|{brian -> 1.0, steve -> 2.3, allie -> 8.0}|[1.0, 2.3, 8.0]|
|0783563078|chris,1.0                    |{chris -> 1.0}                            |[1.0]          |
|0783801254|michelle,1.0:vixon,2.3       |{michelle -> 1.0, vixon -> 2.3}           |[1.0, 2.3]     |
+----------+-----------------------------+------------------------------------------+---------------+

由于您只对平均分数感兴趣,您也可以使用 explodemap_values 返回的数组拆分为多行,然后再使用 mean 进行聚合。在下面的示例中,我在分组依据中包含了原始列 score,但是您可以将其删除并在您的应用程序中获得相同的结果。

(
    df.withColumn(
        "score_values",
        explode(map_values(F.expr("str_to_map(scores,':',',')")))
    )
    .groupBy("id","scores") // you may remove "scores" from here to only have the id
    .agg(
        mean("score_values").alias("score_avg")
    )
).show(false)
+----------+-----------------------------+-----------------+
|id        |scores                       |score_avg        |
+----------+-----------------------------+-----------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|3.766666666666667|
|0783801254|michelle,1.0:vixon,2.3       |1.65             |
|0783563078|chris,1.0                    |1.0              |
+----------+-----------------------------+-----------------+

方法二

如果您更喜欢使用 struct,您可以在 spark-sql 中使用 splittransform and named_struct 将您的数据转换为所需的 struct 例如

val df2=(
    df.withColumn(
        "score_struct",
        expr("transform(split(scores,':'), x-> named_struct('student',split(x,',')[0],'score',split(x,',')[1])       )")
    )
)
df2.printSchema()
df2.show(false)
root
 |-- id: string (nullable = true)
 |-- scores: string (nullable = true)
 |-- score_struct: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- student: string (nullable = true)
 |    |    |-- score: string (nullable = true)

+----------+-----------------------------+------------------------------------------+
|id        |scores                       |score_struct                              |
+----------+-----------------------------+------------------------------------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|[{brian, 1.0}, {steve, 2.3}, {allie, 8.0}]|
|0783563078|chris,1.0                    |[{chris, 1.0}]                            |
|0783801254|michelle,1.0:vixon,2.3       |[{michelle, 1.0}, {vixon, 2.3}]           |
+----------+-----------------------------+------------------------------------------+

我们可以再次 explode 在使用 mean 确定平均值之前将每行中的值列表拆分为多行,例如

df2=(
    df.withColumn(
        "score_struct",
        expr("explode(transform(split(scores,':'), x-> named_struct('student',split(x,',')[0],'score',split(x,',')[1])       ))")
    )
    .groupBy("id")
    .agg(
        mean("score_struct.score").alias("score_avg")
    )
)
df2.printSchema()
df2.show(truncate=False)
root
 |-- id: string (nullable = true)
 |-- score_avg: double (nullable = true)

+----------+-----------------+
|id        |score_avg        |
+----------+-----------------+
|0000000003|3.766666666666667|
|0783563078|1.0              |
|0783801254|1.65             |
+----------+-----------------+

方法 3

您可以简单地使用方法 2 只提取您想要的值,即分数,然后再计算平均值,例如:

val df2=(
    df.withColumn(
        "score",
        expr("explode(transform(split(scores,':'), x-> split(x,',')[1]       ))")
    )
    .groupBy("id")
    .agg(
        mean("score").alias("score_avg")
    )
)
df2.printSchema()
df2.show(false)
root
 |-- id: string (nullable = true)
 |-- score_avg: double (nullable = true)

+----------+-----------------+
|id        |score_avg        |
+----------+-----------------+
|0000000003|3.766666666666667|
|0783563078|1.0              |
|0783801254|1.65             |
+----------+-----------------+

方法 4

此方法使用 splitaggregate 提取每行的总和,然后除以条目数以求平均值

df2=(
    df.withColumn(
        "scores",
        split("scores",':')
    )
    .withColumn(
        "scores",
        expr("aggregate(scores,cast(0 as double), (acc,x) -> acc + split(x,',')[1])") / size("scores")
    )
)
df2.printSchema()
df2.show(false)
root
 |-- id: string (nullable = true)
 |-- scores: double (nullable = true)

+----------+-----------------+
|id        |scores           |
+----------+-----------------+
|0000000003|3.766666666666667|
|0783563078|1.0              |
|0783801254|1.65             |
+----------+-----------------+