Spark MLLib 的 LassoWithSGD 不能缩放?

Spark MLLib's LassoWithSGD doesn't scale?

我有类似于以下的代码:

val fileContent = sc.textFile("file:///myfile")

val dataset = fileContent.map(row => {
    val explodedRow = row.split(",").map(s => s.toDouble)

    new LabeledPoint(explodedRow(13), Vectors.dense(

    Array(explodedRow(10), explodedRow(11), explodedRow(12))
))})

val algo = new LassoWithSGD().setIntercept(true)

val lambda = 0.0
algo.optimizer.setRegParam(lambda)
algo.optimizer.setNumIterations(100)
algo.optimizer.setStepSize(1.0)

val model = algo.run(dataset)

我运行在我的 20 核虚拟服务器上的云中安装它。该文件是一个包含几百万行的 "local"(即不在 HDFS 中)文件。我 运行 在本地模式下使用 sbt 运行 (即我不使用集群,我不使用 spark-submit)。

我原以为随着我将 spark.master=local[*] 设置从 local[8] 增加到 local[40],速度会越来越快。相反,无论我使用什么设置,它都需要相同的时间(但我从 Spark UI 注意到我的执行者在任何给定时间的最大活动任务数等于预期数量,即~8 代表 local[8],~40 代表 local[40],等等——看来并行化有效)。

默认情况下,我的数据集 RDD 的分区数是 4。我尝试将分区数强制设置为 20,但没有成功——事实上,它进一步减慢了 Lasso 算法...

我对缩放过程的预期不正确吗?有人可以帮我解决这个问题吗?

Is my expectation of the scaling process incorrect?

嗯,有点。我希望你不介意我用一点 Python 来证明我的观点。

  1. 大方地说几百万行实际上是一千万行。具有 40 000 000 个值(截距 + 3 个特征 + 每行标签),它提供了大约 380 MB 的数据(Java Doubledouble-precision 64-bit IEEE 754 floating point)。让我们创建一些虚拟数据:

    import numpy as np
    
    n = 10 * 1000**2
    X = np.random.uniform(size=(n, 4))  # Features
    y = np.random.uniform(size=(n, 1))  # Labels
    theta = np.random.uniform(size=(4, 1))  # Estimated parameters
    
  2. 梯度下降的每一步(因为LassoWithSGD的默认miniBatchFraction是1.0所以不是真正随机的)忽略正则化需要这样操作。

    def step(X, y, theta):
        return ((X.dot(theta) - y) * X).sum(0)
    

    那么让我们看看在本地处理我们的数据需要多长时间:

    %timeit -n 15 step(X, y, theta)
    ## 15 loops, best of 3: 743 ms per loop
    

    每步不到一秒,没有任何额外的优化。直觉上它非常快,匹配起来并不容易。只是为了好玩,让我们看看为这样的数据获得封闭形式的解决方案需要多少

    %timeit -n 15 np.linalg.inv(X.transpose().dot(X)).dot(X.transpose()).dot(y)
    ## 15 loops, best of 3: 1.33 s per loop
    
  3. 现在让我们回到 Spark。可以并行计算单个点的残差。因此,当您增加并行处理的分区数量时,这是线性扩展的部分。

    问题是你必须在本地聚合数据,序列化,传输到驱动程序,在本地反序列化和减少每一步后得到最终结果。然后你计算新的 theta,序列化发回等等。

    所有这些都可以通过正确使用小批量和一些进一步的优化来改进,但最终你会受到整个系统延迟的限制。值得注意的是,当你在工作端增加并行度时,你也会增加必须在驱动程序上顺序执行的工作量,反之亦然。 Amdahl's law 会以某种方式咬你。

    此外,以上所有内容都忽略了实际实施。

    现在让我们进行另一个实验。首先是一些虚拟数据:

    nCores = 8  # Number of cores on local machine I use for tests
    rdd = sc.parallelize([], nCores)
    

    和基准:

    %timeit -n 40 rdd.mapPartitions(lambda x: x).count()
    ## 40 loops, best of 3: 82.3 ms per loop
    

    这意味着在没有任何实际处理或网络流量的情况下,如果有 8 个内核,我们将无法通过增加 Spark 中的并行度来做得更好(假设并行化的线性可扩展性,每个分区 743 毫秒/8 = 92.875 毫秒)部分)

以上总结:

  • 如果可以使用梯度下降的封闭形式解决方案轻松地在本地处理数据,那只是浪费时间。如果你想增加并行度/减少延迟,你可以使用好的线性代数库
  • Spark 旨在处理大量数据而不是减少延迟。如果您的数据适合几年前智能手机的内存,这是一个好兆头,这不是正确的工具
  • 如果计算成本低廉,那么固定成本将成为限制因素

旁注:

  • 通常来说,每台机器的内核数相对较多并不是最佳选择,除非您可以将其与 IO 吞吐量相匹配