将 HiveQL 转换为 Spark Scala
Convert HiveQL into Spark Scala
我想将带有 window 函数的 HiveQL 查询转换为 Scala Spark 查询...但我不断收到相同的异常。
问题上下文:mytable
由 category
和 product
字段组成。我想获得每个类别的前 N 个频繁产品的列表。 DF
下面是一个HiveContext
对象
原始查询(正确运行):
SELECT category, product, freq FROM (
SELECT category, product, COUNT(*) AS freq,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY COUNT(*) DESC) as seqnum
FROM mytable GROUP BY category, product) ci
WHERE seqnum <= 10;
我现在拥有的(部分转换,不起作用):
val w = row_number().over(Window.partitionBy("category").orderBy(count("*").desc))
val result = df.select("category", "product").groupBy("category", "product").agg(count("*").as("freq"))
val new_res = result.withColumn("seqNum", w).where(col("seqNum") <= 10).drop("seqNum")
不断收到以下异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'category' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
这里有什么问题吗?
你的错误是在 orderBy
子句中使用了聚合:
.orderBy(count("*").desc)
这样写的话,expression引入了新的聚合表达式。相反,您应该按名称引用现有聚合:
.orderBy("freq")
因此您的代码应如下所示:
val w = row_number().over(
Window.partitionBy("category").orderBy("freq"))
val result = df.select("category", "product")
.groupBy("category", "product")
.agg(count("*").as("freq"))
val new_res = result
.withColumn("seqNum", w).where(col("seqNum") <= 10)
.drop("seqNum")
我想将带有 window 函数的 HiveQL 查询转换为 Scala Spark 查询...但我不断收到相同的异常。
问题上下文:mytable
由 category
和 product
字段组成。我想获得每个类别的前 N 个频繁产品的列表。 DF
下面是一个HiveContext
对象
原始查询(正确运行):
SELECT category, product, freq FROM (
SELECT category, product, COUNT(*) AS freq,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY COUNT(*) DESC) as seqnum
FROM mytable GROUP BY category, product) ci
WHERE seqnum <= 10;
我现在拥有的(部分转换,不起作用):
val w = row_number().over(Window.partitionBy("category").orderBy(count("*").desc))
val result = df.select("category", "product").groupBy("category", "product").agg(count("*").as("freq"))
val new_res = result.withColumn("seqNum", w).where(col("seqNum") <= 10).drop("seqNum")
不断收到以下异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'category' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
这里有什么问题吗?
你的错误是在 orderBy
子句中使用了聚合:
.orderBy(count("*").desc)
这样写的话,expression引入了新的聚合表达式。相反,您应该按名称引用现有聚合:
.orderBy("freq")
因此您的代码应如下所示:
val w = row_number().over(
Window.partitionBy("category").orderBy("freq"))
val result = df.select("category", "product")
.groupBy("category", "product")
.agg(count("*").as("freq"))
val new_res = result
.withColumn("seqNum", w).where(col("seqNum") <= 10)
.drop("seqNum")