数据集上的 Spark aggregateByKey
Spark aggregateByKey on Dataset
Here's an example of aggregateByKey on mutable.HashSet[String] written by @bbejeck
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
但是当我切换到Dataset时,出现如下错误,是因为Spark 2.0(我使用的版本)不支持Dataset上的aggregateByKey吗?
java.lang.NullPointerException
at org.apache.spark.sql.Dataset.schema(Dataset.scala:393)
at org.apache.spark.sql.Dataset.toDF(Dataset.scala:339)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
代码如下:
case class Food(name: String,
price: String,
e_date: String)
rdd.aggregateByKey( Seq(Food("", "", "")).toDS )(
(f1,f2) => f1.union(f2),
(f1,f2) => f1.union(f2))
/////////
found f1 = Invalid tree; null:
null
知道为什么会发生这种情况,提前谢谢您!
是的,我认为 aggregateByKey 仅适用于 rdd。
这是文档(用于 python)
http://spark.apache.org/docs/latest/api/python/pyspark.html
删除 .toDS 并尝试代码。
也许聚合完成后将其转换为DS(不确定性能是否会更好)。
Here's an example of aggregateByKey on mutable.HashSet[String] written by @bbejeck
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
但是当我切换到Dataset时,出现如下错误,是因为Spark 2.0(我使用的版本)不支持Dataset上的aggregateByKey吗?
java.lang.NullPointerException
at org.apache.spark.sql.Dataset.schema(Dataset.scala:393)
at org.apache.spark.sql.Dataset.toDF(Dataset.scala:339)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
代码如下:
case class Food(name: String,
price: String,
e_date: String)
rdd.aggregateByKey( Seq(Food("", "", "")).toDS )(
(f1,f2) => f1.union(f2),
(f1,f2) => f1.union(f2))
/////////
found f1 = Invalid tree; null:
null
知道为什么会发生这种情况,提前谢谢您!
是的,我认为 aggregateByKey 仅适用于 rdd。
这是文档(用于 python)
http://spark.apache.org/docs/latest/api/python/pyspark.html
删除 .toDS 并尝试代码。 也许聚合完成后将其转换为DS(不确定性能是否会更好)。