在 Spark SQL 中使用 collect_list 和 collect_set
Use collect_list and collect_set in Spark SQL
根据docs, the collect_set
and collect_list
functions should be available in Spark SQL. However, I cannot get it to work. I'm running Spark 1.6.0 using a Docker image。
我正尝试在 Scala 中执行此操作:
import org.apache.spark.sql.functions._
df.groupBy("column1")
.agg(collect_set("column2"))
.show()
并在运行时收到以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function collect_set;
也尝试过使用pyspark
,但也失败了。文档说明这些函数是 Hive UDAF 的别名,但我不知道要启用这些函数。
如何解决这个问题?谢谢!
Spark 2.0+:
SPARK-10605 引入了本机 collect_list
和 collect_set
实现。不再需要具有 Hive 支持的 SparkSession
或 HiveContext
。
Spark 2.0-SNAPSHOT (2016-05-03 之前):
您必须为给定的 SparkSession
:
启用 Hive 支持
在 Scala 中:
val spark = SparkSession.builder
.master("local")
.appName("testing")
.enableHiveSupport() // <- enable Hive support.
.getOrCreate()
在Python中:
spark = (SparkSession.builder
.enableHiveSupport()
.getOrCreate())
Spark < 2.0:
为了能够使用 Hive UDF(参见 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF),您已经使用了带有 Hive 支持的 Spark(当您使用预构建的二进制文件时,这已经涵盖了,这里似乎就是这种情况)并初始化SparkContext
使用 HiveContext
.
在 Scala 中:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
val sqlContext: SQLContext = new HiveContext(sc)
在Python中:
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
根据docs, the collect_set
and collect_list
functions should be available in Spark SQL. However, I cannot get it to work. I'm running Spark 1.6.0 using a Docker image。
我正尝试在 Scala 中执行此操作:
import org.apache.spark.sql.functions._
df.groupBy("column1")
.agg(collect_set("column2"))
.show()
并在运行时收到以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function collect_set;
也尝试过使用pyspark
,但也失败了。文档说明这些函数是 Hive UDAF 的别名,但我不知道要启用这些函数。
如何解决这个问题?谢谢!
Spark 2.0+:
SPARK-10605 引入了本机 collect_list
和 collect_set
实现。不再需要具有 Hive 支持的 SparkSession
或 HiveContext
。
Spark 2.0-SNAPSHOT (2016-05-03 之前):
您必须为给定的 SparkSession
:
在 Scala 中:
val spark = SparkSession.builder
.master("local")
.appName("testing")
.enableHiveSupport() // <- enable Hive support.
.getOrCreate()
在Python中:
spark = (SparkSession.builder
.enableHiveSupport()
.getOrCreate())
Spark < 2.0:
为了能够使用 Hive UDF(参见 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF),您已经使用了带有 Hive 支持的 Spark(当您使用预构建的二进制文件时,这已经涵盖了,这里似乎就是这种情况)并初始化SparkContext
使用 HiveContext
.
在 Scala 中:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
val sqlContext: SQLContext = new HiveContext(sc)
在Python中:
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)