如何使用计算组聚合数据
How to aggregate data using computed groups
我的数据以(大致)
的形式存储在 Spark 数据框中
Col1 Col2
A1 -5
B1 -20
C1 7
A2 3
B2 -4
C2 17
我想把它变成:
Col3 Col4
A 2
B -24
C 24
(将 A 的数字相加并将 X1 和 X1 连接到 X)
如何使用数据框 API 执行此操作?
编辑:
col1
值实际上是任意字符串(端点),我想将其连接成一列(跨度),可能采用 "A1-A2" 的形式。我计划将端点映射到 Map 中的其他端点并在我的 UDF 中查询它。我的UDF可以return None吗? - 假设我根本不想在 col3
中包含 A
,但我确实想包含 B
和 C
,我可以在您的示例中添加另一个案例吗以便在将 col1 映射到 col3 时跳过 A
行?
您可以简单地提取组列并将其用作聚合组。假设您的数据遵循示例中的模式:
原始 SQL:
case class Record(Col1: String, Col2: Int)
val df = sqlContext.createDataFrame(Seq(
Record("A1", -5),
Record("B1", -20),
Record("C1", 7),
Record("A2", 3),
Record("B2", -4),
Record("C2", 17)))
df.registerTempTable("df")
sqlContext.sql(
"""SELECT col3, sum(col2) AS col4 FROM (
SELECT col2, SUBSTR(Col1, 1, 1) AS col3 FROM df
) tmp GROUP BY col3""").show
+----+----+
|col3|col4|
+----+----+
| A| -2|
| B| -24|
| C| 24|
+----+----+
使用 Scala API:
import org.apache.spark.sql.functions.{udf, sum}
val getGroup = udf((s: String) => s.substring(0, 1))
df
.select(getGroup($"col1").alias("col3"), $"col2")
.groupBy($"col3")
.agg(sum($"col2").alias("col4"))
+----+----+
|col3|col4|
+----+----+
| A| -2|
| B| -24|
| C| 24|
+----+----+
如果分组模式比较复杂,您可以简单地调整SUBSTR
或getGroup
功能。例如:
val getGroup = {
val pattern = "^[A-Z]+".r
udf((s: String) => pattern.findFirstIn(s) match {
case Some(g) => g
case None => "Unknown"
})
}
编辑 :
如果您想忽略某些组,只需添加 WHERE
子句即可。使用 raw SQL 很简单,但使用 Scala API 需要一些努力:
import org.apache.spark.sql.functions.{not, lit}
df
.select(...) // As before
.where(not($"col3".in(lit("A"))))
.groupBy(...).agg(...) // As before
如果你想丢弃多列,你可以使用可变参数:
val toDiscard = List("A", "B").map(lit(_))
df
.select(...)
.where(not($"col3".in(toDiscard: _*)))
.groupBy(...).agg(...) // As before
Can my UDF return None?
不能,但可以return null
:
val getGroup2 = udf((s: String) => s.substring(0, 1) match {
case x if x != "A" => x
case _ => null: String
})
df
.select(getGroup2($"col1").alias("col3"), $"col2")
.where($"col3".isNotNull)
.groupBy(...).agg(...) // As before
+----+----+
|col3|col4|
+----+----+
| B| -24|
| C| 24|
+----+----+
我的数据以(大致)
的形式存储在 Spark 数据框中Col1 Col2
A1 -5
B1 -20
C1 7
A2 3
B2 -4
C2 17
我想把它变成:
Col3 Col4
A 2
B -24
C 24
(将 A 的数字相加并将 X1 和 X1 连接到 X)
如何使用数据框 API 执行此操作?
编辑:
col1
值实际上是任意字符串(端点),我想将其连接成一列(跨度),可能采用 "A1-A2" 的形式。我计划将端点映射到 Map 中的其他端点并在我的 UDF 中查询它。我的UDF可以return None吗? - 假设我根本不想在 col3
中包含 A
,但我确实想包含 B
和 C
,我可以在您的示例中添加另一个案例吗以便在将 col1 映射到 col3 时跳过 A
行?
您可以简单地提取组列并将其用作聚合组。假设您的数据遵循示例中的模式:
原始 SQL:
case class Record(Col1: String, Col2: Int)
val df = sqlContext.createDataFrame(Seq(
Record("A1", -5),
Record("B1", -20),
Record("C1", 7),
Record("A2", 3),
Record("B2", -4),
Record("C2", 17)))
df.registerTempTable("df")
sqlContext.sql(
"""SELECT col3, sum(col2) AS col4 FROM (
SELECT col2, SUBSTR(Col1, 1, 1) AS col3 FROM df
) tmp GROUP BY col3""").show
+----+----+
|col3|col4|
+----+----+
| A| -2|
| B| -24|
| C| 24|
+----+----+
使用 Scala API:
import org.apache.spark.sql.functions.{udf, sum}
val getGroup = udf((s: String) => s.substring(0, 1))
df
.select(getGroup($"col1").alias("col3"), $"col2")
.groupBy($"col3")
.agg(sum($"col2").alias("col4"))
+----+----+
|col3|col4|
+----+----+
| A| -2|
| B| -24|
| C| 24|
+----+----+
如果分组模式比较复杂,您可以简单地调整SUBSTR
或getGroup
功能。例如:
val getGroup = {
val pattern = "^[A-Z]+".r
udf((s: String) => pattern.findFirstIn(s) match {
case Some(g) => g
case None => "Unknown"
})
}
编辑 :
如果您想忽略某些组,只需添加 WHERE
子句即可。使用 raw SQL 很简单,但使用 Scala API 需要一些努力:
import org.apache.spark.sql.functions.{not, lit}
df
.select(...) // As before
.where(not($"col3".in(lit("A"))))
.groupBy(...).agg(...) // As before
如果你想丢弃多列,你可以使用可变参数:
val toDiscard = List("A", "B").map(lit(_))
df
.select(...)
.where(not($"col3".in(toDiscard: _*)))
.groupBy(...).agg(...) // As before
Can my UDF return None?
不能,但可以return null
:
val getGroup2 = udf((s: String) => s.substring(0, 1) match {
case x if x != "A" => x
case _ => null: String
})
df
.select(getGroup2($"col1").alias("col3"), $"col2")
.where($"col3".isNotNull)
.groupBy(...).agg(...) // As before
+----+----+
|col3|col4|
+----+----+
| B| -24|
| C| 24|
+----+----+