Spark 数据集 - 在对数据集进行筛选时出现 NullPointerException
Spark Dataset - NullPointerException while doing a filter on dataset
我有 2 个数据集,如下所示。我试图找出有多少产品与每个游戏相关联。基本上,我正在尝试统计相关产品的数量。
scala> df1.show()
gameid | games | users | cnt_assoc_prod
-------------------------------------------
1 | cricket |[111, 121] |
2 | basketball|[211] |
3 | skating |[101, 100, 98] |
scala> df2.show()
user | products
----------------------
98 | "shampoo"
100 | "soap"
101 | "shampoo"
111 | "shoes"
121 | "honey"
211 | "shoes"
我试图从数组中遍历 df1 的每个用户,并通过在与用户匹配的列上应用过滤器来找到 df2 中的相应行。
df1.map{x => {
var assoc_products = new Set()
x.users.foreach(y => assoc_products + df2.filter(z => z.user == y).first().
products)
x.cnt_assoc_prod = assoc_products.size
}
在应用过滤器时我得到以下异常
java.lang.NullPointerException
at org.apache.spark.sql.Dataset.logicalPlan(Dataset.scala:784)
at org.apache.spark.sql.Dataset.mapPartitions(Dataset.scala:344)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:307)
我使用的是 1.6.1 版的 spark。
您可以分解 df1
中的 users
列,与 user
列中的 df2
连接,然后执行groupBy
计数:
(df1.withColumn("user", explode(col("users")))
.join(df2, Seq("user"))
.groupBy("gameid", "games")
.agg(count($"products").alias("cnt_assoc_prod"))
).show
+------+----------+--------------+
|gameid| games|cnt_assoc_prod|
+------+----------+--------------+
| 3| skating| 3|
| 2|basketball| 1|
| 1| cricket| 2|
+------+----------+--------------+
我有 2 个数据集,如下所示。我试图找出有多少产品与每个游戏相关联。基本上,我正在尝试统计相关产品的数量。
scala> df1.show()
gameid | games | users | cnt_assoc_prod
-------------------------------------------
1 | cricket |[111, 121] |
2 | basketball|[211] |
3 | skating |[101, 100, 98] |
scala> df2.show()
user | products
----------------------
98 | "shampoo"
100 | "soap"
101 | "shampoo"
111 | "shoes"
121 | "honey"
211 | "shoes"
我试图从数组中遍历 df1 的每个用户,并通过在与用户匹配的列上应用过滤器来找到 df2 中的相应行。
df1.map{x => {
var assoc_products = new Set()
x.users.foreach(y => assoc_products + df2.filter(z => z.user == y).first().
products)
x.cnt_assoc_prod = assoc_products.size
}
在应用过滤器时我得到以下异常
java.lang.NullPointerException
at org.apache.spark.sql.Dataset.logicalPlan(Dataset.scala:784)
at org.apache.spark.sql.Dataset.mapPartitions(Dataset.scala:344)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:307)
我使用的是 1.6.1 版的 spark。
您可以分解 df1
中的 users
列,与 user
列中的 df2
连接,然后执行groupBy
计数:
(df1.withColumn("user", explode(col("users")))
.join(df2, Seq("user"))
.groupBy("gameid", "games")
.agg(count($"products").alias("cnt_assoc_prod"))
).show
+------+----------+--------------+
|gameid| games|cnt_assoc_prod|
+------+----------+--------------+
| 3| skating| 3|
| 2|basketball| 1|
| 1| cricket| 2|
+------+----------+--------------+