SPARK SQL 等效于 Qualify + Row_number 语句

SPARK SQL Equivalent of Qualify + Row_number statements

有谁知道 Apache Spark SQL 实现与标准 SQL qualify() + rnk 或 row_number 语句相同结果的最佳方法吗?

例如:

我希望我的最终结果是一个新的 Spark Dataframe,其中包含 100 个唯一 account_numbers 中每一个的 3 个最新记录(由 statement_date 降序确定),因此 300 个最终记录在总计

在标准 Teradata SQL 中,我可以执行以下操作:

select * from statement_data
qualify row_number ()
over(partition by acct_id order by statement_date desc) <= 3

Apache Spark SQL 没有我知道的独立限定函数,也许我搞砸了语法或找不到存在限定的文档。

如果我需要分两步执行此操作,只要这两步是:

编辑 1 - 7/23 2:09pm: zero323 提供的初始解决方案在安装了 Spark SQL 1.4.1 依赖项的 Spark 1.4.1 中对我不起作用。

编辑 2 - 7/23 3:24pm: 事实证明该错误与我的查询使用 SQL 上下文对象而不是 Hive 上下文有关。添加以下代码以创建和使用 Hive 上下文后,我现在能够 运行 正确地执行以下解决方案:

final JavaSparkContext sc2;
final HiveContext hc2;
DataFrame df;
hc2 = TestHive$.MODULE$;
sc2 = new JavaSparkContext(hc2.sparkContext()); 
....
// Initial Spark/SQL contexts to set up Dataframes  
SparkConf conf = new SparkConf().setAppName("Statement Test");
...
DataFrame stmtSummary = 
    hc2.sql("SELECT * FROM (SELECT acct_id, stmt_end_dt, stmt_curr_bal, row_number() over (partition by acct_id order by stmt_curr_bal DESC) rank_num FROM stmt_data) tmp WHERE rank_num <= 3");

没有qualify(检查parser source通常很有用)但是你可以像这样使用子查询:

SELECT * FROM (
    SELECT *, row_number() OVER (
        PARTITION BY acct_id ORDER BY statement_date DESC
    ) rank FROM df
 ) tmp WHERE rank <= 3

另见 SPARK : failure: ``union'' expected but `(' found