将字符串列表转换为结构列表
Convert List of strings to list of structs
我有一个包含两列的数据框。第一个是一列唯一 ID,第二个是用冒号分隔的学生分数列表(这是在没有 headers 的情况下从 CSV 加载它之后)。
是否有任何机制可以将第二列转换为结构列表以供进一步处理?还是动态数量的附加列?我只需要一种方法来对每个 id 的分数进行额外处理,即计算 id 0000000003
的平均值,这在当前输出数据格式中无法完成。
即
+----------+-----------------------------+
|id |scores |
+----------+-----------------------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|
|0783563078|chris,1.0 |
|0783801254|michelle,1.0:vixon,2.3 |
+----------+-----------------------------+
进入
+----------+--------------------------------------------------------------------------+
|id |scores |
+----------+--------------------------------------------------------------------------+
|0000000003|[{student -> brian, score -> 1.0 } , {student -> steve, score -> 2.3 .... |
+----------+--------------------------------------------------------------------------+
或者可能是这样的:
+----------+--------+------+--------+------+------+
|id |student1|score1|student2|score3|etc...|
+----------+--------+------+--------+------+------+
|0000000003| | | | | |
+----------+--------+------+--------+------+------+
我只是不确定如何将这种数据格式转换为可处理的格式。
方法 4 可能是获得平均值的最短方法,但其他方法允许您将数据提取为 maps/structs。
方法一
一种易于访问的方法可能是使用 str_to_map
which will convert your string value to a map. You could then use map_values
来提取分数,例如
(
df.withColumn(
"score_map",
expr("str_to_map(scores,':',',')")
).withColumn(
"score_values",
map_values(F.expr("str_to_map(scores,':',',')"))
)
).show(false)
+----------+-----------------------------+------------------------------------------+---------------+
|id |scores |score_map |score_values |
+----------+-----------------------------+------------------------------------------+---------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|{brian -> 1.0, steve -> 2.3, allie -> 8.0}|[1.0, 2.3, 8.0]|
|0783563078|chris,1.0 |{chris -> 1.0} |[1.0] |
|0783801254|michelle,1.0:vixon,2.3 |{michelle -> 1.0, vixon -> 2.3} |[1.0, 2.3] |
+----------+-----------------------------+------------------------------------------+---------------+
由于您只对平均分数感兴趣,您也可以使用 explode
将 map_values
返回的数组拆分为多行,然后再使用 mean
进行聚合。在下面的示例中,我在分组依据中包含了原始列 score
,但是您可以将其删除并在您的应用程序中获得相同的结果。
(
df.withColumn(
"score_values",
explode(map_values(F.expr("str_to_map(scores,':',',')")))
)
.groupBy("id","scores") // you may remove "scores" from here to only have the id
.agg(
mean("score_values").alias("score_avg")
)
).show(false)
+----------+-----------------------------+-----------------+
|id |scores |score_avg |
+----------+-----------------------------+-----------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|3.766666666666667|
|0783801254|michelle,1.0:vixon,2.3 |1.65 |
|0783563078|chris,1.0 |1.0 |
+----------+-----------------------------+-----------------+
方法二
如果您更喜欢使用 struct
,您可以在 spark-sql 中使用 split
、transform
and named_struct
将您的数据转换为所需的 struct
例如
val df2=(
df.withColumn(
"score_struct",
expr("transform(split(scores,':'), x-> named_struct('student',split(x,',')[0],'score',split(x,',')[1]) )")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- scores: string (nullable = true)
|-- score_struct: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- student: string (nullable = true)
| | |-- score: string (nullable = true)
+----------+-----------------------------+------------------------------------------+
|id |scores |score_struct |
+----------+-----------------------------+------------------------------------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|[{brian, 1.0}, {steve, 2.3}, {allie, 8.0}]|
|0783563078|chris,1.0 |[{chris, 1.0}] |
|0783801254|michelle,1.0:vixon,2.3 |[{michelle, 1.0}, {vixon, 2.3}] |
+----------+-----------------------------+------------------------------------------+
我们可以再次 explode
在使用 mean
确定平均值之前将每行中的值列表拆分为多行,例如
df2=(
df.withColumn(
"score_struct",
expr("explode(transform(split(scores,':'), x-> named_struct('student',split(x,',')[0],'score',split(x,',')[1]) ))")
)
.groupBy("id")
.agg(
mean("score_struct.score").alias("score_avg")
)
)
df2.printSchema()
df2.show(truncate=False)
root
|-- id: string (nullable = true)
|-- score_avg: double (nullable = true)
+----------+-----------------+
|id |score_avg |
+----------+-----------------+
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
+----------+-----------------+
方法 3
您可以简单地使用方法 2 只提取您想要的值,即分数,然后再计算平均值,例如:
val df2=(
df.withColumn(
"score",
expr("explode(transform(split(scores,':'), x-> split(x,',')[1] ))")
)
.groupBy("id")
.agg(
mean("score").alias("score_avg")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- score_avg: double (nullable = true)
+----------+-----------------+
|id |score_avg |
+----------+-----------------+
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
+----------+-----------------+
方法 4
此方法使用 split
和 aggregate
提取每行的总和,然后除以条目数以求平均值
df2=(
df.withColumn(
"scores",
split("scores",':')
)
.withColumn(
"scores",
expr("aggregate(scores,cast(0 as double), (acc,x) -> acc + split(x,',')[1])") / size("scores")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- scores: double (nullable = true)
+----------+-----------------+
|id |scores |
+----------+-----------------+
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
+----------+-----------------+
我有一个包含两列的数据框。第一个是一列唯一 ID,第二个是用冒号分隔的学生分数列表(这是在没有 headers 的情况下从 CSV 加载它之后)。
是否有任何机制可以将第二列转换为结构列表以供进一步处理?还是动态数量的附加列?我只需要一种方法来对每个 id 的分数进行额外处理,即计算 id 0000000003
的平均值,这在当前输出数据格式中无法完成。
即
+----------+-----------------------------+
|id |scores |
+----------+-----------------------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|
|0783563078|chris,1.0 |
|0783801254|michelle,1.0:vixon,2.3 |
+----------+-----------------------------+
进入
+----------+--------------------------------------------------------------------------+
|id |scores |
+----------+--------------------------------------------------------------------------+
|0000000003|[{student -> brian, score -> 1.0 } , {student -> steve, score -> 2.3 .... |
+----------+--------------------------------------------------------------------------+
或者可能是这样的:
+----------+--------+------+--------+------+------+
|id |student1|score1|student2|score3|etc...|
+----------+--------+------+--------+------+------+
|0000000003| | | | | |
+----------+--------+------+--------+------+------+
我只是不确定如何将这种数据格式转换为可处理的格式。
方法 4 可能是获得平均值的最短方法,但其他方法允许您将数据提取为 maps/structs。
方法一
一种易于访问的方法可能是使用 str_to_map
which will convert your string value to a map. You could then use map_values
来提取分数,例如
(
df.withColumn(
"score_map",
expr("str_to_map(scores,':',',')")
).withColumn(
"score_values",
map_values(F.expr("str_to_map(scores,':',',')"))
)
).show(false)
+----------+-----------------------------+------------------------------------------+---------------+
|id |scores |score_map |score_values |
+----------+-----------------------------+------------------------------------------+---------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|{brian -> 1.0, steve -> 2.3, allie -> 8.0}|[1.0, 2.3, 8.0]|
|0783563078|chris,1.0 |{chris -> 1.0} |[1.0] |
|0783801254|michelle,1.0:vixon,2.3 |{michelle -> 1.0, vixon -> 2.3} |[1.0, 2.3] |
+----------+-----------------------------+------------------------------------------+---------------+
由于您只对平均分数感兴趣,您也可以使用 explode
将 map_values
返回的数组拆分为多行,然后再使用 mean
进行聚合。在下面的示例中,我在分组依据中包含了原始列 score
,但是您可以将其删除并在您的应用程序中获得相同的结果。
(
df.withColumn(
"score_values",
explode(map_values(F.expr("str_to_map(scores,':',',')")))
)
.groupBy("id","scores") // you may remove "scores" from here to only have the id
.agg(
mean("score_values").alias("score_avg")
)
).show(false)
+----------+-----------------------------+-----------------+
|id |scores |score_avg |
+----------+-----------------------------+-----------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|3.766666666666667|
|0783801254|michelle,1.0:vixon,2.3 |1.65 |
|0783563078|chris,1.0 |1.0 |
+----------+-----------------------------+-----------------+
方法二
如果您更喜欢使用 struct
,您可以在 spark-sql 中使用 split
、transform
and named_struct
将您的数据转换为所需的 struct
例如
val df2=(
df.withColumn(
"score_struct",
expr("transform(split(scores,':'), x-> named_struct('student',split(x,',')[0],'score',split(x,',')[1]) )")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- scores: string (nullable = true)
|-- score_struct: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- student: string (nullable = true)
| | |-- score: string (nullable = true)
+----------+-----------------------------+------------------------------------------+
|id |scores |score_struct |
+----------+-----------------------------+------------------------------------------+
|0000000003|brian,1.0:steve,2.3:allie,8.0|[{brian, 1.0}, {steve, 2.3}, {allie, 8.0}]|
|0783563078|chris,1.0 |[{chris, 1.0}] |
|0783801254|michelle,1.0:vixon,2.3 |[{michelle, 1.0}, {vixon, 2.3}] |
+----------+-----------------------------+------------------------------------------+
我们可以再次 explode
在使用 mean
确定平均值之前将每行中的值列表拆分为多行,例如
df2=(
df.withColumn(
"score_struct",
expr("explode(transform(split(scores,':'), x-> named_struct('student',split(x,',')[0],'score',split(x,',')[1]) ))")
)
.groupBy("id")
.agg(
mean("score_struct.score").alias("score_avg")
)
)
df2.printSchema()
df2.show(truncate=False)
root
|-- id: string (nullable = true)
|-- score_avg: double (nullable = true)
+----------+-----------------+
|id |score_avg |
+----------+-----------------+
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
+----------+-----------------+
方法 3
您可以简单地使用方法 2 只提取您想要的值,即分数,然后再计算平均值,例如:
val df2=(
df.withColumn(
"score",
expr("explode(transform(split(scores,':'), x-> split(x,',')[1] ))")
)
.groupBy("id")
.agg(
mean("score").alias("score_avg")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- score_avg: double (nullable = true)
+----------+-----------------+
|id |score_avg |
+----------+-----------------+
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
+----------+-----------------+
方法 4
此方法使用 split
和 aggregate
提取每行的总和,然后除以条目数以求平均值
df2=(
df.withColumn(
"scores",
split("scores",':')
)
.withColumn(
"scores",
expr("aggregate(scores,cast(0 as double), (acc,x) -> acc + split(x,',')[1])") / size("scores")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- scores: double (nullable = true)
+----------+-----------------+
|id |scores |
+----------+-----------------+
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
+----------+-----------------+