Spark Dataframe GroupBy 和计算 Complex 聚合函数
Spark Dataframe GroupBy and compute Complex aggregate function
使用 Spark dataframe ,我需要使用以下方法计算百分比
复杂公式:
按 "KEY " 分组并计算 "re_pct" 为 ( sum(sa) / sum( sa / (pct/100) ) ) * 100
例如,输入数据帧是
val values1 = List(List("01", "20000", "45.30"), List("01", "30000", "45.30"))
.map(row => (row(0), row(1), row(2)))
val DS1 = values1.toDF("KEY", "SA", "PCT")
DS1.show()
+---+-----+-----+
|KEY| SA| PCT|
+---+-----+-----+
| 01|20000|45.30|
| 01|30000|45.30|
+---+-----+-----+
预期结果:
+---+-----+--------------+
|KEY| re_pcnt |
+---+-----+--------------+
| 01| 45.30000038505 |
+---+-----+--------------+
我试着计算如下
val result = DS1.groupBy("KEY").agg(((sum("SA").divide(
sum(
("SA").divide(
("PCT").divide(100)
)
)
)) * 100).as("re_pcnt"))
但遇到错误:(36, 16) value divide is not a member of String ("SA").divide({
对实现上述逻辑有什么建议吗?
您的代码几乎可以完美运行。您只需添加“$”符号即可指定您要传递的列:
val result = DS1.groupBy($"KEY").agg(((sum($"SA").divide(
sum(
($"SA").divide(
($"PCT").divide(100)
)
)
)) * 100).as("re_pcnt"))
这是输出:
result.show()
+---+-------+
|KEY|re_pcnt|
+---+-------+
| 01| 45.3|
+---+-------+
您可以尝试导入 spark.implicits._
然后使用 $
来引用列。
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
val result = DS1.groupBy("KEY")
.agg(((sum($"SA").divide(sum(($"SA").divide(($"PCT").divide(100))))) * 100)
.as("re_pcnt"))
这将为您提供请求的输出。
如果您不想导入,您可以随时使用 col()
命令代替 $
。
可以使用 expr()
将字符串用作 agg()
函数的输入。但是,输入字符串需要稍微改变一下。以下给出与之前完全相同的结果,但使用字符串代替:
val opr = "sum(SA)/(sum(SA/(PCT/100))) * 100"
val df = DS1.groupBy("KEY").agg(expr(opr).as("re_pcnt"))
注意.as("re_pcnt")
需要在agg()
方法里面,不能在外面
使用 Spark dataframe ,我需要使用以下方法计算百分比 复杂公式:
按 "KEY " 分组并计算 "re_pct" 为 ( sum(sa) / sum( sa / (pct/100) ) ) * 100
例如,输入数据帧是
val values1 = List(List("01", "20000", "45.30"), List("01", "30000", "45.30"))
.map(row => (row(0), row(1), row(2)))
val DS1 = values1.toDF("KEY", "SA", "PCT")
DS1.show()
+---+-----+-----+
|KEY| SA| PCT|
+---+-----+-----+
| 01|20000|45.30|
| 01|30000|45.30|
+---+-----+-----+
预期结果:
+---+-----+--------------+
|KEY| re_pcnt |
+---+-----+--------------+
| 01| 45.30000038505 |
+---+-----+--------------+
我试着计算如下
val result = DS1.groupBy("KEY").agg(((sum("SA").divide(
sum(
("SA").divide(
("PCT").divide(100)
)
)
)) * 100).as("re_pcnt"))
但遇到错误:(36, 16) value divide is not a member of String ("SA").divide({
对实现上述逻辑有什么建议吗?
您的代码几乎可以完美运行。您只需添加“$”符号即可指定您要传递的列:
val result = DS1.groupBy($"KEY").agg(((sum($"SA").divide(
sum(
($"SA").divide(
($"PCT").divide(100)
)
)
)) * 100).as("re_pcnt"))
这是输出:
result.show()
+---+-------+
|KEY|re_pcnt|
+---+-------+
| 01| 45.3|
+---+-------+
您可以尝试导入 spark.implicits._
然后使用 $
来引用列。
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
val result = DS1.groupBy("KEY")
.agg(((sum($"SA").divide(sum(($"SA").divide(($"PCT").divide(100))))) * 100)
.as("re_pcnt"))
这将为您提供请求的输出。
如果您不想导入,您可以随时使用 col()
命令代替 $
。
可以使用 expr()
将字符串用作 agg()
函数的输入。但是,输入字符串需要稍微改变一下。以下给出与之前完全相同的结果,但使用字符串代替:
val opr = "sum(SA)/(sum(SA/(PCT/100))) * 100"
val df = DS1.groupBy("KEY").agg(expr(opr).as("re_pcnt"))
注意.as("re_pcnt")
需要在agg()
方法里面,不能在外面