spark - 计算 2 列或更多列中值的平均值并在每一行中放入新列
spark - Calculating average of values in 2 or more columns and putting in new column in every row
假设我有一个 Dataset/Dataframe 包含以下内容:-
name, marks1, marks2
Alice, 10, 20
Bob, 20, 30
我想添加一个新列,它应该是 B 列和 C 列的平均值。
预期结果:-
name, marks1, marks2, Result(Avg)
Alice, 10, 20, 15
Bob, 20, 30, 25
用于求和或我使用的任何其他算术运算 df.withColumn("xyz", $"marks1"+$"marks2")
。我找不到 Average 的类似方法。请帮忙。
另外:- 列数不固定。有时它可能是 2 列的平均值,有时是 3 列甚至更多。所以我想要一个应该可以工作的通用代码。
最简单和优化的方法之一是创建一个标记列的列表,并将其与withColumn
一起用作
pyspark
from pyspark.sql.functions import col
marksColumns = [col('marks1'), col('marks2')]
averageFunc = sum(x for x in marksColumns)/len(marksColumns)
df.withColumn('Result(Avg)', averageFunc).show(truncate=False)
你应该得到
+-----+------+------+-----------+
|name |marks1|marks2|Result(Avg)|
+-----+------+------+-----------+
|Alice|10 |20 |15.0 |
|Bob |20 |30 |25.0 |
+-----+------+------+-----------+
scala-spark
scala 中的过程与上述 python 中的过程几乎相同
import org.apache.spark.sql.functions.{col, lit}
val marksColumns = Array(col("marks1"), col("marks2"))
val averageFunc = marksColumns.foldLeft(lit(0)){(x, y) => x+y}/marksColumns.length
df.withColumn("Result(Avg)", averageFunc).show(false)
这应该会为您提供与 pyspark
中相同的输出
希望回答对你有帮助
就像使用用户定义函数一样简单。通过创建一个特定的 UDF 来处理许多列的平均值,您将能够根据需要多次重复使用它。
Python
在这个片段中,我创建了一个 UDF,它接受一个列数组,并计算它的平均值。
from pyspark.sql.functions import udf, array
from pyspark.sql.types import DoubleType
avg_cols = udf(lambda array: sum(array)/len(array), DoubleType())
df.withColumn("average", avg_cols(array("marks1", "marks2"))).show()
输出:
+-----+------+------+--------+
| name|marks1|marks2| average|
+-----+------+------+--------+
|Alice| 10| 20| 15.0|
| Bob| 20| 30| 25.0|
+-----+------+------+--------+
斯卡拉
使用 Scala API,您必须将 selected 列作为行处理。您只需使用 Spark struct
函数 select 列。
import org.apache.spark.sql.functions._
import spark.implicits._
import scala.util.Try
def average = udf((row: Row) => {
val values = row.toSeq.map(x => Try(x.toString.toDouble).toOption).filter(_.isDefined).map(_.get)
if(values.nonEmpty) values.sum / values.length else 0.0
})
df.withColumn("average", average(struct($"marks1", $"marks2"))).show()
如您所见,我使用 Try
将所有值转换为 Double,这样如果无法转换值,它不会抛出任何异常,仅对这些列执行平均值已定义。
仅此而已:)
假设我有一个 Dataset/Dataframe 包含以下内容:-
name, marks1, marks2
Alice, 10, 20
Bob, 20, 30
我想添加一个新列,它应该是 B 列和 C 列的平均值。
预期结果:-
name, marks1, marks2, Result(Avg)
Alice, 10, 20, 15
Bob, 20, 30, 25
用于求和或我使用的任何其他算术运算 df.withColumn("xyz", $"marks1"+$"marks2")
。我找不到 Average 的类似方法。请帮忙。
另外:- 列数不固定。有时它可能是 2 列的平均值,有时是 3 列甚至更多。所以我想要一个应该可以工作的通用代码。
最简单和优化的方法之一是创建一个标记列的列表,并将其与withColumn
一起用作
pyspark
from pyspark.sql.functions import col
marksColumns = [col('marks1'), col('marks2')]
averageFunc = sum(x for x in marksColumns)/len(marksColumns)
df.withColumn('Result(Avg)', averageFunc).show(truncate=False)
你应该得到
+-----+------+------+-----------+
|name |marks1|marks2|Result(Avg)|
+-----+------+------+-----------+
|Alice|10 |20 |15.0 |
|Bob |20 |30 |25.0 |
+-----+------+------+-----------+
scala-spark
scala 中的过程与上述 python 中的过程几乎相同
import org.apache.spark.sql.functions.{col, lit}
val marksColumns = Array(col("marks1"), col("marks2"))
val averageFunc = marksColumns.foldLeft(lit(0)){(x, y) => x+y}/marksColumns.length
df.withColumn("Result(Avg)", averageFunc).show(false)
这应该会为您提供与 pyspark
中相同的输出希望回答对你有帮助
就像使用用户定义函数一样简单。通过创建一个特定的 UDF 来处理许多列的平均值,您将能够根据需要多次重复使用它。
Python
在这个片段中,我创建了一个 UDF,它接受一个列数组,并计算它的平均值。
from pyspark.sql.functions import udf, array
from pyspark.sql.types import DoubleType
avg_cols = udf(lambda array: sum(array)/len(array), DoubleType())
df.withColumn("average", avg_cols(array("marks1", "marks2"))).show()
输出:
+-----+------+------+--------+
| name|marks1|marks2| average|
+-----+------+------+--------+
|Alice| 10| 20| 15.0|
| Bob| 20| 30| 25.0|
+-----+------+------+--------+
斯卡拉
使用 Scala API,您必须将 selected 列作为行处理。您只需使用 Spark struct
函数 select 列。
import org.apache.spark.sql.functions._
import spark.implicits._
import scala.util.Try
def average = udf((row: Row) => {
val values = row.toSeq.map(x => Try(x.toString.toDouble).toOption).filter(_.isDefined).map(_.get)
if(values.nonEmpty) values.sum / values.length else 0.0
})
df.withColumn("average", average(struct($"marks1", $"marks2"))).show()
如您所见,我使用 Try
将所有值转换为 Double,这样如果无法转换值,它不会抛出任何异常,仅对这些列执行平均值已定义。
仅此而已:)