RDD 转换和操作只能由驱动程序调用
RDD transformations and actions can only be invoked by the driver
错误:
org.apache.spark.SparkException: RDD 转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x => rdd2.values.count() * x) 无效,因为无法在 rdd1.map 转换内部执行值转换和计数操作。有关详细信息,请参阅 SPARK-5063。
def computeRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {
val numDistinctUsers = test_data.map(x => x.user).distinct().count()
val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
(u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
})
val hitsAndMiss: RDD[(Int, Double)] = userRecs.map(x => (x._1, x._2.intersect(x._3).size.toDouble))
val hits = hitsAndMiss.map(x => x._2).sum() / numDistinctUsers
return hits
}
我正在使用 MatrixFactorizationModel.scala
中的方法,我必须映射用户,然后调用该方法来获取每个用户的结果。通过这样做,我引入了我认为导致问题的嵌套映射:
我知道这个问题实际上发生在:
val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
(u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
})
因为在映射时我正在调用 model.recommendProducts
MatrixFactorizationModel
是分布式模型,因此您不能简单地从操作或转换中调用它。最接近你在这里做的事情是这样的:
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating}
def computeRatio(model: MatrixFactorizationModel, testUsers: RDD[Rating]) = {
val testData = testUsers.map(r => (r.user, r.product)).groupByKey
val n = testData.count
val recommendations = model
.recommendProductsForUsers(20)
.mapValues(_.map(r => r.product))
val hits = testData
.join(recommendations)
.values
.map{case (xs, ys) => xs.toSet.intersect(ys.toSet).size}
.sum
hits / n
}
备注:
distinct
是一项昂贵的操作,在这里完全过时,因为您可以从分组数据中获得相同的信息
- 而不是
groupBy
后跟投影 (map
),先投影后分组。如果您只想要一个产品 ID,则没有理由转移完整评级。
错误:
org.apache.spark.SparkException: RDD 转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x => rdd2.values.count() * x) 无效,因为无法在 rdd1.map 转换内部执行值转换和计数操作。有关详细信息,请参阅 SPARK-5063。
def computeRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {
val numDistinctUsers = test_data.map(x => x.user).distinct().count()
val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
(u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
})
val hitsAndMiss: RDD[(Int, Double)] = userRecs.map(x => (x._1, x._2.intersect(x._3).size.toDouble))
val hits = hitsAndMiss.map(x => x._2).sum() / numDistinctUsers
return hits
}
我正在使用 MatrixFactorizationModel.scala
中的方法,我必须映射用户,然后调用该方法来获取每个用户的结果。通过这样做,我引入了我认为导致问题的嵌套映射:
我知道这个问题实际上发生在:
val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
(u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
})
因为在映射时我正在调用 model.recommendProducts
MatrixFactorizationModel
是分布式模型,因此您不能简单地从操作或转换中调用它。最接近你在这里做的事情是这样的:
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating}
def computeRatio(model: MatrixFactorizationModel, testUsers: RDD[Rating]) = {
val testData = testUsers.map(r => (r.user, r.product)).groupByKey
val n = testData.count
val recommendations = model
.recommendProductsForUsers(20)
.mapValues(_.map(r => r.product))
val hits = testData
.join(recommendations)
.values
.map{case (xs, ys) => xs.toSet.intersect(ys.toSet).size}
.sum
hits / n
}
备注:
distinct
是一项昂贵的操作,在这里完全过时,因为您可以从分组数据中获得相同的信息- 而不是
groupBy
后跟投影 (map
),先投影后分组。如果您只想要一个产品 ID,则没有理由转移完整评级。