Spark与SparkSQL:如何模仿window函数?
Spark and SparkSQL: How to imitate window function?
描述
给定一个数据帧df
id | date
---------------
1 | 2015-09-01
2 | 2015-09-01
1 | 2015-09-03
1 | 2015-09-04
2 | 2015-09-04
我想创建一个 运行 计数器或索引,
- 按相同的 id 分组并且
- 在该组中按日期排序,
因此
id | date | counter
--------------------------
1 | 2015-09-01 | 1
1 | 2015-09-03 | 2
1 | 2015-09-04 | 3
2 | 2015-09-01 | 1
2 | 2015-09-04 | 2
这是我可以用 window 函数实现的,例如
val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )
不幸的是,Spark 1.4.1 不支持 window 常规数据帧的函数:
org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
问题
- 如何在不使用window函数的情况下在当前的Spark 1.4.1上实现上述计算?
- Spark 何时支持 window 常规数据帧的函数?
谢谢!
您也可以将 HiveContext
用于本地 DataFrames
,除非您有充分的理由不这样做,否则这可能是个好主意。它是 spark-shell
和 pyspark
shell 中可用的默认 SQLContext
(至于现在 sparkR
似乎使用普通的 SQLContext
)并且它的解析器是Spark SQL and DataFrame Guide.
推荐
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber
object HiveContextTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Hive Context")
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(
("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
).toDF("k", "v")
val w = Window.partitionBy($"k").orderBy($"v")
df.select($"k", $"v", rowNumber.over(w).alias("rn")).show
}
}
您可以使用 RDD 来做到这一点。就我个人而言,我发现 RDD 的 API 更有意义——我并不总是希望我的数据像数据帧一样 'flat'。
val df = sqlContext.sql("select 1, '2015-09-01'"
).unionAll(sqlContext.sql("select 2, '2015-09-01'")
).unionAll(sqlContext.sql("select 1, '2015-09-03'")
).unionAll(sqlContext.sql("select 1, '2015-09-04'")
).unionAll(sqlContext.sql("select 2, '2015-09-04'"))
// dataframe as an RDD (of Row objects)
df.rdd
// grouping by the first column of the row
.groupBy(r => r(0))
// map each group - an Iterable[Row] - to a list and sort by the second column
.map(g => g._2.toList.sortBy(row => row(1).toString))
.collect()
上面的结果如下:
Array[List[org.apache.spark.sql.Row]] =
Array(
List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]),
List([2,2015-09-01], [2,2015-09-04]))
如果你也想要'group'中的位置,你可以使用zipWithIndex
。
df.rdd.groupBy(r => r(0)).map(g =>
g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()
Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
List(([2,2015-09-01],0), ([2,2015-09-04],1)))
您 可以 使用 FlatMap 将其扁平化为一个简单的 List/Array 个 Row
对象,但是如果您需要对 'group' 这不是个好主意。
像这样使用 RDD 的缺点是从 DataFrame 转换为 RDD 并再次转换回来非常乏味。
我完全同意 Window 如果您有 Spark 版本 (>=)1.5,DataFrames 的函数是可行的。但是如果你真的坚持使用旧版本(例如 1.4.1),这里有一个解决这个问题的 hacky 方法
val df = sc.parallelize((1, "2015-09-01") :: (2, "2015-09-01") :: (1, "2015-09-03") :: (1, "2015-09-04") :: (1, "2015-09-04") :: Nil)
.toDF("id", "date")
val dfDuplicate = df.selecExpr("id as idDup", "date as dateDup")
val dfWithCounter = df.join(dfDuplicate,$"id"===$"idDup")
.where($"date"<=$"dateDup")
.groupBy($"id", $"date")
.agg($"id", $"date", count($"idDup").as("counter"))
.select($"id",$"date",$"counter")
现在如果你这样做 dfWithCounter.show
您将获得:
+---+----------+-------+
| id| date|counter|
+---+----------+-------+
| 1|2015-09-01| 1|
| 1|2015-09-04| 3|
| 1|2015-09-03| 2|
| 2|2015-09-01| 1|
| 2|2015-09-04| 2|
+---+----------+-------+
请注意,date
未排序,但 counter
是正确的。您还可以通过在 where
语句中将 <=
更改为 >=
来更改 counter
的顺序。
描述
给定一个数据帧df
id | date
---------------
1 | 2015-09-01
2 | 2015-09-01
1 | 2015-09-03
1 | 2015-09-04
2 | 2015-09-04
我想创建一个 运行 计数器或索引,
- 按相同的 id 分组并且
- 在该组中按日期排序,
因此
id | date | counter
--------------------------
1 | 2015-09-01 | 1
1 | 2015-09-03 | 2
1 | 2015-09-04 | 3
2 | 2015-09-01 | 1
2 | 2015-09-04 | 2
这是我可以用 window 函数实现的,例如
val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )
不幸的是,Spark 1.4.1 不支持 window 常规数据帧的函数:
org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
问题
- 如何在不使用window函数的情况下在当前的Spark 1.4.1上实现上述计算?
- Spark 何时支持 window 常规数据帧的函数?
谢谢!
您也可以将 HiveContext
用于本地 DataFrames
,除非您有充分的理由不这样做,否则这可能是个好主意。它是 spark-shell
和 pyspark
shell 中可用的默认 SQLContext
(至于现在 sparkR
似乎使用普通的 SQLContext
)并且它的解析器是Spark SQL and DataFrame Guide.
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber
object HiveContextTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Hive Context")
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(
("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
).toDF("k", "v")
val w = Window.partitionBy($"k").orderBy($"v")
df.select($"k", $"v", rowNumber.over(w).alias("rn")).show
}
}
您可以使用 RDD 来做到这一点。就我个人而言,我发现 RDD 的 API 更有意义——我并不总是希望我的数据像数据帧一样 'flat'。
val df = sqlContext.sql("select 1, '2015-09-01'"
).unionAll(sqlContext.sql("select 2, '2015-09-01'")
).unionAll(sqlContext.sql("select 1, '2015-09-03'")
).unionAll(sqlContext.sql("select 1, '2015-09-04'")
).unionAll(sqlContext.sql("select 2, '2015-09-04'"))
// dataframe as an RDD (of Row objects)
df.rdd
// grouping by the first column of the row
.groupBy(r => r(0))
// map each group - an Iterable[Row] - to a list and sort by the second column
.map(g => g._2.toList.sortBy(row => row(1).toString))
.collect()
上面的结果如下:
Array[List[org.apache.spark.sql.Row]] =
Array(
List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]),
List([2,2015-09-01], [2,2015-09-04]))
如果你也想要'group'中的位置,你可以使用zipWithIndex
。
df.rdd.groupBy(r => r(0)).map(g =>
g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()
Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
List(([2,2015-09-01],0), ([2,2015-09-04],1)))
您 可以 使用 FlatMap 将其扁平化为一个简单的 List/Array 个 Row
对象,但是如果您需要对 'group' 这不是个好主意。
像这样使用 RDD 的缺点是从 DataFrame 转换为 RDD 并再次转换回来非常乏味。
我完全同意 Window 如果您有 Spark 版本 (>=)1.5,DataFrames 的函数是可行的。但是如果你真的坚持使用旧版本(例如 1.4.1),这里有一个解决这个问题的 hacky 方法
val df = sc.parallelize((1, "2015-09-01") :: (2, "2015-09-01") :: (1, "2015-09-03") :: (1, "2015-09-04") :: (1, "2015-09-04") :: Nil)
.toDF("id", "date")
val dfDuplicate = df.selecExpr("id as idDup", "date as dateDup")
val dfWithCounter = df.join(dfDuplicate,$"id"===$"idDup")
.where($"date"<=$"dateDup")
.groupBy($"id", $"date")
.agg($"id", $"date", count($"idDup").as("counter"))
.select($"id",$"date",$"counter")
现在如果你这样做 dfWithCounter.show
您将获得:
+---+----------+-------+
| id| date|counter|
+---+----------+-------+
| 1|2015-09-01| 1|
| 1|2015-09-04| 3|
| 1|2015-09-03| 2|
| 2|2015-09-01| 1|
| 2|2015-09-04| 2|
+---+----------+-------+
请注意,date
未排序,但 counter
是正确的。您还可以通过在 where
语句中将 <=
更改为 >=
来更改 counter
的顺序。