SPARK SQL 等效于 Qualify + Row_number 语句
SPARK SQL Equivalent of Qualify + Row_number statements
有谁知道 Apache Spark SQL 实现与标准 SQL qualify() + rnk 或 row_number 语句相同结果的最佳方法吗?
例如:
- 我有一个名为 statement_data 的 Spark Dataframe,每个月有 12 条记录,每条记录 100 条唯一 account_numbers,因此总共有 1200 条记录
- 每个月度记录都有一个名为"statement_date"的字段,可用于确定最近的记录
我希望我的最终结果是一个新的 Spark Dataframe,其中包含 100 个唯一 account_numbers 中每一个的 3 个最新记录(由 statement_date 降序确定),因此 300 个最终记录在总计
在标准 Teradata SQL 中,我可以执行以下操作:
select * from statement_data
qualify row_number ()
over(partition by acct_id order by statement_date desc) <= 3
Apache Spark SQL 没有我知道的独立限定函数,也许我搞砸了语法或找不到存在限定的文档。
如果我需要分两步执行此操作,只要这两步是:
- select 查询或替代方法为每个 account_number 的记录分配 rank/row 编号
- 一个 select 查询,我正在 select 查询排名 <= 3 的所有记录(即选择第一、第二和第三最近的记录)。
编辑 1 - 7/23 2:09pm:
zero323 提供的初始解决方案在安装了 Spark SQL 1.4.1 依赖项的 Spark 1.4.1 中对我不起作用。
编辑 2 - 7/23 3:24pm:
事实证明该错误与我的查询使用 SQL 上下文对象而不是 Hive 上下文有关。添加以下代码以创建和使用 Hive 上下文后,我现在能够 运行 正确地执行以下解决方案:
final JavaSparkContext sc2;
final HiveContext hc2;
DataFrame df;
hc2 = TestHive$.MODULE$;
sc2 = new JavaSparkContext(hc2.sparkContext());
....
// Initial Spark/SQL contexts to set up Dataframes
SparkConf conf = new SparkConf().setAppName("Statement Test");
...
DataFrame stmtSummary =
hc2.sql("SELECT * FROM (SELECT acct_id, stmt_end_dt, stmt_curr_bal, row_number() over (partition by acct_id order by stmt_curr_bal DESC) rank_num FROM stmt_data) tmp WHERE rank_num <= 3");
没有qualify
(检查parser source通常很有用)但是你可以像这样使用子查询:
SELECT * FROM (
SELECT *, row_number() OVER (
PARTITION BY acct_id ORDER BY statement_date DESC
) rank FROM df
) tmp WHERE rank <= 3
另见 SPARK : failure: ``union'' expected but `(' found
有谁知道 Apache Spark SQL 实现与标准 SQL qualify() + rnk 或 row_number 语句相同结果的最佳方法吗?
例如:
- 我有一个名为 statement_data 的 Spark Dataframe,每个月有 12 条记录,每条记录 100 条唯一 account_numbers,因此总共有 1200 条记录
- 每个月度记录都有一个名为"statement_date"的字段,可用于确定最近的记录
我希望我的最终结果是一个新的 Spark Dataframe,其中包含 100 个唯一 account_numbers 中每一个的 3 个最新记录(由 statement_date 降序确定),因此 300 个最终记录在总计
在标准 Teradata SQL 中,我可以执行以下操作:
select * from statement_data
qualify row_number ()
over(partition by acct_id order by statement_date desc) <= 3
Apache Spark SQL 没有我知道的独立限定函数,也许我搞砸了语法或找不到存在限定的文档。
如果我需要分两步执行此操作,只要这两步是:
- select 查询或替代方法为每个 account_number 的记录分配 rank/row 编号
- 一个 select 查询,我正在 select 查询排名 <= 3 的所有记录(即选择第一、第二和第三最近的记录)。
编辑 1 - 7/23 2:09pm: zero323 提供的初始解决方案在安装了 Spark SQL 1.4.1 依赖项的 Spark 1.4.1 中对我不起作用。
编辑 2 - 7/23 3:24pm: 事实证明该错误与我的查询使用 SQL 上下文对象而不是 Hive 上下文有关。添加以下代码以创建和使用 Hive 上下文后,我现在能够 运行 正确地执行以下解决方案:
final JavaSparkContext sc2;
final HiveContext hc2;
DataFrame df;
hc2 = TestHive$.MODULE$;
sc2 = new JavaSparkContext(hc2.sparkContext());
....
// Initial Spark/SQL contexts to set up Dataframes
SparkConf conf = new SparkConf().setAppName("Statement Test");
...
DataFrame stmtSummary =
hc2.sql("SELECT * FROM (SELECT acct_id, stmt_end_dt, stmt_curr_bal, row_number() over (partition by acct_id order by stmt_curr_bal DESC) rank_num FROM stmt_data) tmp WHERE rank_num <= 3");
没有qualify
(检查parser source通常很有用)但是你可以像这样使用子查询:
SELECT * FROM (
SELECT *, row_number() OVER (
PARTITION BY acct_id ORDER BY statement_date DESC
) rank FROM df
) tmp WHERE rank <= 3
另见 SPARK : failure: ``union'' expected but `(' found