如何使用计算组聚合数据

How to aggregate data using computed groups

我的数据以(大致)

的形式存储在 Spark 数据框中
Col1 Col2

A1   -5
B1   -20
C1   7
A2   3
B2   -4
C2   17

我想把它变成:

Col3 Col4

A    2
B   -24
C    24

(将 A 的数字相加并将 X1 和 X1 连接到 X)

如何使用数据框 API 执行此操作?

编辑:

col1 值实际上是任意字符串(端点),我想将其连接成一列(跨度),可能采用 "A1-A2" 的形式。我计划将端点映射到 Map 中的其他端点并在我的 UDF 中查询它。我的UDF可以return None吗? - 假设我根本不想在 col3 中包含 A,但我确实想包含 BC,我可以在您的示例中添加另一个案例吗以便在将 col1 映射到 col3 时跳过 A 行?

您可以简单地提取组列并将其用作聚合组。假设您的数据遵循示例中的模式:

原始 SQL:

case class Record(Col1: String, Col2: Int)

val df = sqlContext.createDataFrame(Seq(
    Record("A1", -5),
    Record("B1", -20),
    Record("C1", 7),
    Record("A2", 3),
    Record("B2", -4),
    Record("C2", 17)))

df.registerTempTable("df")

sqlContext.sql(
    """SELECT col3, sum(col2) AS col4 FROM (
        SELECT col2, SUBSTR(Col1, 1, 1) AS col3 FROM df
    ) tmp GROUP BY col3""").show

+----+----+
|col3|col4|
+----+----+
|   A|  -2|
|   B| -24|
|   C|  24|
+----+----+

使用 Scala API:

import org.apache.spark.sql.functions.{udf, sum}

val getGroup = udf((s: String) => s.substring(0, 1))

df
  .select(getGroup($"col1").alias("col3"), $"col2")
  .groupBy($"col3")
  .agg(sum($"col2").alias("col4"))

+----+----+
|col3|col4|
+----+----+
|   A|  -2|
|   B| -24|
|   C|  24|
+----+----+

如果分组模式比较复杂,您可以简单地调整SUBSTRgetGroup功能。例如:

val getGroup = {
  val pattern = "^[A-Z]+".r
    udf((s: String) => pattern.findFirstIn(s) match {
      case Some(g) => g
      case None => "Unknown"
  })
}

编辑 :

如果您想忽略某些组,只需添加 WHERE 子句即可。使用 raw SQL 很简单,但使用 Scala API 需要一些努力:

 import org.apache.spark.sql.functions.{not, lit}

 df
   .select(...) // As before
   .where(not($"col3".in(lit("A"))))
   .groupBy(...).agg(...) // As before

如果你想丢弃多列,你可以使用可变参数:

val toDiscard = List("A", "B").map(lit(_))

df
    .select(...)
    .where(not($"col3".in(toDiscard: _*)))
    .groupBy(...).agg(...) // As before

Can my UDF return None?

不能,但可以return null:

val getGroup2 = udf((s: String) => s.substring(0, 1) match {
    case x if x != "A" => x
    case _ => null: String
})

 df
   .select(getGroup2($"col1").alias("col3"), $"col2")
   .where($"col3".isNotNull)
   .groupBy(...).agg(...) // As before

+----+----+
|col3|col4|
+----+----+
|   B| -24|
|   C|  24|
+----+----+