在小数据集上执行非常慢——从哪里开始调试?
Awfully slow execution on a small datasets – where to start debugging?
我在独立模式下使用 Zeppelin NB 和 Spark 在 MacBook(i5、2.6GHz、8GB 内存)上进行了一些实验。 spark.executor/driver.memory 都得到 2g。我还在 spark-defaults.conf 中设置了 spark.serializer org.apache.spark.serializer.KryoSerializer
,但这似乎被 zeppelin
忽略了
肌萎缩侧索硬化模型
我已经训练了一个具有 ~400k(隐式)评级的 ALS 模型,并希望获得 val allRecommendations = model.recommendProductsForUsers(1)
的推荐
样本集
接下来我拿个样本来玩
val sampledRecommendations = allRecommendations.sample(false, 0.05, 1234567).cache
这包含 3600 条建议。
删除用户拥有的产品推荐
接下来我想删除给定用户已经拥有的产品的所有评级,我在表格 RDD 中保存的列表 (user_id, Set[product_ids]): RDD[(Long, scala.collection.mutable.HashSet[Int])]
val productRecommendations = (sampledRecommendations
// add user portfolio to the list, but convert the key from Long to Int first
.join(usersProductsFlat.map( up => (up._1.toInt, up._2) ))
.mapValues(
// (user, (ratings: Array[Rating], usersOwnedProducts: HashSet[Long]))
r => (r._1
.filter( rating => !r._2.contains(rating.product))
.filter( rating => rating.rating > 0.5)
.toList
)
)
// In case there is no recommendation (left), remove the entry
.filter(rating => !rating._2.isEmpty)
).cache
问题 1
在缓存样本集上调用此 (productRecommendations.count
) 会生成一个阶段,其中包括 flatMap at MatrixFactorizationModel.scala:278
和 10,000 个任务、263.6 MB 的输入数据和 196.0 MB 的随机写入。不应该使用微小的和缓存的 RDD,这里 (wr)on(g) 是怎么回事?执行计数需要将近 5 分钟!
问题 2
根据应用程序UI中的“存储”视图,调用完全缓存的usersProductsFlat.count
每次需要~60秒。它的大小为 23Mb – 不是应该快很多吗?
映射到可读形式
接下来,我以某种可读的形式将其替换为广播查找映射中的名称的 ID,以放入 DF/table:
val readableRatings = (productRecommendations
.flatMapValues(x=>x)
.map( r => (r._1, userIdToMailBC.value(r._1), r._2.product.toInt, productIdToNameBC.value(r._2.product), r._2.rating))
).cache
val readableRatingsDF = readableRatings.toDF("user","email", "product_id", "product", "rating").cache
readableRatingsDF.registerTempTable("recommendations")
Select …有耐心
疯狂的部分从这里开始。做一个 SELECT 需要 几个小时 (我等不及完成):
%sql
SELECT COUNT(user) AS usr_cnt, product, AVG(rating) AS avg_rating
FROM recommendations
GROUP BY product
我不知道在哪里可以找到瓶颈,显然这里正在发生一些巨大的混乱!我可以从哪里开始寻找?
您的分区数量可能太大了。我认为当 运行 在本地模式下你应该使用大约 200 而不是 10000。你可以用不同的方式设置分区数。我建议您编辑 Spark 配置文件中的 spark.default.parallelism 标志。
我在独立模式下使用 Zeppelin NB 和 Spark 在 MacBook(i5、2.6GHz、8GB 内存)上进行了一些实验。 spark.executor/driver.memory 都得到 2g。我还在 spark-defaults.conf 中设置了 spark.serializer org.apache.spark.serializer.KryoSerializer
,但这似乎被 zeppelin
肌萎缩侧索硬化模型
我已经训练了一个具有 ~400k(隐式)评级的 ALS 模型,并希望获得 val allRecommendations = model.recommendProductsForUsers(1)
样本集
接下来我拿个样本来玩
val sampledRecommendations = allRecommendations.sample(false, 0.05, 1234567).cache
这包含 3600 条建议。
删除用户拥有的产品推荐
接下来我想删除给定用户已经拥有的产品的所有评级,我在表格 RDD 中保存的列表 (user_id, Set[product_ids]): RDD[(Long, scala.collection.mutable.HashSet[Int])]
val productRecommendations = (sampledRecommendations
// add user portfolio to the list, but convert the key from Long to Int first
.join(usersProductsFlat.map( up => (up._1.toInt, up._2) ))
.mapValues(
// (user, (ratings: Array[Rating], usersOwnedProducts: HashSet[Long]))
r => (r._1
.filter( rating => !r._2.contains(rating.product))
.filter( rating => rating.rating > 0.5)
.toList
)
)
// In case there is no recommendation (left), remove the entry
.filter(rating => !rating._2.isEmpty)
).cache
问题 1
在缓存样本集上调用此 (productRecommendations.count
) 会生成一个阶段,其中包括 flatMap at MatrixFactorizationModel.scala:278
和 10,000 个任务、263.6 MB 的输入数据和 196.0 MB 的随机写入。不应该使用微小的和缓存的 RDD,这里 (wr)on(g) 是怎么回事?执行计数需要将近 5 分钟!
问题 2
根据应用程序UI中的“存储”视图,调用完全缓存的usersProductsFlat.count
每次需要~60秒。它的大小为 23Mb – 不是应该快很多吗?
映射到可读形式
接下来,我以某种可读的形式将其替换为广播查找映射中的名称的 ID,以放入 DF/table:
val readableRatings = (productRecommendations
.flatMapValues(x=>x)
.map( r => (r._1, userIdToMailBC.value(r._1), r._2.product.toInt, productIdToNameBC.value(r._2.product), r._2.rating))
).cache
val readableRatingsDF = readableRatings.toDF("user","email", "product_id", "product", "rating").cache
readableRatingsDF.registerTempTable("recommendations")
Select …有耐心
疯狂的部分从这里开始。做一个 SELECT 需要 几个小时 (我等不及完成):
%sql
SELECT COUNT(user) AS usr_cnt, product, AVG(rating) AS avg_rating
FROM recommendations
GROUP BY product
我不知道在哪里可以找到瓶颈,显然这里正在发生一些巨大的混乱!我可以从哪里开始寻找?
您的分区数量可能太大了。我认为当 运行 在本地模式下你应该使用大约 200 而不是 10000。你可以用不同的方式设置分区数。我建议您编辑 Spark 配置文件中的 spark.default.parallelism 标志。