在小数据集上执行非常慢——从哪里开始调试?

Awfully slow execution on a small datasets – where to start debugging?

我在独立模式下使用 Zeppelin NB 和 Spark 在 MacBook(i5、2.6GHz、8GB 内存)上进行了一些实验。 spark.executor/driver.memory 都得到 2g。我还在 spark-defaults.conf 中设置了 spark.serializer org.apache.spark.serializer.KryoSerializer,但这似乎被 zeppelin

忽略了

肌萎缩侧索硬化模型

我已经训练了一个具有 ~400k(隐式)评级的 ALS 模型,并希望获得 val allRecommendations = model.recommendProductsForUsers(1)

的推荐

样本集

接下来我拿个样本来玩

val sampledRecommendations = allRecommendations.sample(false, 0.05, 1234567).cache

这包含 3600 条建议。

删除用户拥有的产品推荐

接下来我想删除给定用户已经拥有的产品的所有评级,我在表格 RDD 中保存的列表 (user_id, Set[product_ids]): RDD[(Long, scala.collection.mutable.HashSet[Int])]

val productRecommendations = (sampledRecommendations
// add user portfolio to the list, but convert the key from Long to Int first
.join(usersProductsFlat.map( up => (up._1.toInt, up._2) ))
.mapValues(
    // (user, (ratings: Array[Rating], usersOwnedProducts: HashSet[Long]))
    r => (r._1
        .filter( rating => !r._2.contains(rating.product))
        .filter( rating => rating.rating > 0.5)
        .toList
    )
  )
  // In case there is no recommendation (left), remove the entry
  .filter(rating => !rating._2.isEmpty)
).cache

问题 1 在缓存样本集上调用此 (productRecommendations.count) 会生成一个阶段,其中包括 flatMap at MatrixFactorizationModel.scala:27810,000 个任务、263.6 MB 的输入数据和 196.0 MB 的随机写入。不应该使用微小的和缓存的 RDD,这里 (wr)on(g) 是怎么回事?执行计数需要将近 5 分钟!

问题 2 根据应用程序UI中的“存储”视图,调用完全缓存的usersProductsFlat.count每次需要~60秒。它的大小为 23Mb – 不是应该快很多吗?

映射到可读形式

接下来,我以某种可读的形式将其替换为广播查找映射中的名称的 ID,以放入 DF/table:

val readableRatings = (productRecommendations
    .flatMapValues(x=>x)
    .map( r => (r._1, userIdToMailBC.value(r._1), r._2.product.toInt, productIdToNameBC.value(r._2.product), r._2.rating))
).cache
val readableRatingsDF = readableRatings.toDF("user","email", "product_id", "product", "rating").cache
readableRatingsDF.registerTempTable("recommendations")

Select …有耐心

疯狂的部分从这里开始。做一个 SELECT 需要 几个小时 (我等不及完成):

%sql
SELECT COUNT(user) AS usr_cnt, product, AVG(rating) AS avg_rating
FROM recommendations
GROUP BY product


我不知道在哪里可以找到瓶颈,显然这里正在发生一些巨大的混乱!我可以从哪里开始寻找?

您的分区数量可能太大了。我认为当 运行 在本地模式下你应该使用大约 200 而不是 10000。你可以用不同的方式设置分区数。我建议您编辑 Spark 配置文件中的 spark.default.parallelism 标志。