Spark MLLib 的 LassoWithSGD 不能缩放?
Spark MLLib's LassoWithSGD doesn't scale?
我有类似于以下的代码:
val fileContent = sc.textFile("file:///myfile")
val dataset = fileContent.map(row => {
val explodedRow = row.split(",").map(s => s.toDouble)
new LabeledPoint(explodedRow(13), Vectors.dense(
Array(explodedRow(10), explodedRow(11), explodedRow(12))
))})
val algo = new LassoWithSGD().setIntercept(true)
val lambda = 0.0
algo.optimizer.setRegParam(lambda)
algo.optimizer.setNumIterations(100)
algo.optimizer.setStepSize(1.0)
val model = algo.run(dataset)
我运行在我的 20 核虚拟服务器上的云中安装它。该文件是一个包含几百万行的 "local"(即不在 HDFS 中)文件。我 运行 在本地模式下使用 sbt 运行 (即我不使用集群,我不使用 spark-submit)。
我原以为随着我将 spark.master=local[*] 设置从 local[8] 增加到 local[40],速度会越来越快。相反,无论我使用什么设置,它都需要相同的时间(但我从 Spark UI 注意到我的执行者在任何给定时间的最大活动任务数等于预期数量,即~8 代表 local[8],~40 代表 local[40],等等——看来并行化有效)。
默认情况下,我的数据集 RDD 的分区数是 4。我尝试将分区数强制设置为 20,但没有成功——事实上,它进一步减慢了 Lasso 算法...
我对缩放过程的预期不正确吗?有人可以帮我解决这个问题吗?
Is my expectation of the scaling process incorrect?
嗯,有点。我希望你不介意我用一点 Python 来证明我的观点。
大方地说几百万行实际上是一千万行。具有 40 000 000 个值(截距 + 3 个特征 + 每行标签),它提供了大约 380 MB 的数据(Java Double
是 double-precision 64-bit IEEE 754 floating point)。让我们创建一些虚拟数据:
import numpy as np
n = 10 * 1000**2
X = np.random.uniform(size=(n, 4)) # Features
y = np.random.uniform(size=(n, 1)) # Labels
theta = np.random.uniform(size=(4, 1)) # Estimated parameters
梯度下降的每一步(因为LassoWithSGD
的默认miniBatchFraction
是1.0所以不是真正随机的)忽略正则化需要这样操作。
def step(X, y, theta):
return ((X.dot(theta) - y) * X).sum(0)
那么让我们看看在本地处理我们的数据需要多长时间:
%timeit -n 15 step(X, y, theta)
## 15 loops, best of 3: 743 ms per loop
每步不到一秒,没有任何额外的优化。直觉上它非常快,匹配起来并不容易。只是为了好玩,让我们看看为这样的数据获得封闭形式的解决方案需要多少
%timeit -n 15 np.linalg.inv(X.transpose().dot(X)).dot(X.transpose()).dot(y)
## 15 loops, best of 3: 1.33 s per loop
现在让我们回到 Spark。可以并行计算单个点的残差。因此,当您增加并行处理的分区数量时,这是线性扩展的部分。
问题是你必须在本地聚合数据,序列化,传输到驱动程序,在本地反序列化和减少每一步后得到最终结果。然后你计算新的 theta,序列化发回等等。
所有这些都可以通过正确使用小批量和一些进一步的优化来改进,但最终你会受到整个系统延迟的限制。值得注意的是,当你在工作端增加并行度时,你也会增加必须在驱动程序上顺序执行的工作量,反之亦然。 Amdahl's law 会以某种方式咬你。
此外,以上所有内容都忽略了实际实施。
现在让我们进行另一个实验。首先是一些虚拟数据:
nCores = 8 # Number of cores on local machine I use for tests
rdd = sc.parallelize([], nCores)
和基准:
%timeit -n 40 rdd.mapPartitions(lambda x: x).count()
## 40 loops, best of 3: 82.3 ms per loop
这意味着在没有任何实际处理或网络流量的情况下,如果有 8 个内核,我们将无法通过增加 Spark 中的并行度来做得更好(假设并行化的线性可扩展性,每个分区 743 毫秒/8 = 92.875 毫秒)部分)
以上总结:
- 如果可以使用梯度下降的封闭形式解决方案轻松地在本地处理数据,那只是浪费时间。如果你想增加并行度/减少延迟,你可以使用好的线性代数库
- Spark 旨在处理大量数据而不是减少延迟。如果您的数据适合几年前智能手机的内存,这是一个好兆头,这不是正确的工具
- 如果计算成本低廉,那么固定成本将成为限制因素
旁注:
- 通常来说,每台机器的内核数相对较多并不是最佳选择,除非您可以将其与 IO 吞吐量相匹配
我有类似于以下的代码:
val fileContent = sc.textFile("file:///myfile")
val dataset = fileContent.map(row => {
val explodedRow = row.split(",").map(s => s.toDouble)
new LabeledPoint(explodedRow(13), Vectors.dense(
Array(explodedRow(10), explodedRow(11), explodedRow(12))
))})
val algo = new LassoWithSGD().setIntercept(true)
val lambda = 0.0
algo.optimizer.setRegParam(lambda)
algo.optimizer.setNumIterations(100)
algo.optimizer.setStepSize(1.0)
val model = algo.run(dataset)
我运行在我的 20 核虚拟服务器上的云中安装它。该文件是一个包含几百万行的 "local"(即不在 HDFS 中)文件。我 运行 在本地模式下使用 sbt 运行 (即我不使用集群,我不使用 spark-submit)。
我原以为随着我将 spark.master=local[*] 设置从 local[8] 增加到 local[40],速度会越来越快。相反,无论我使用什么设置,它都需要相同的时间(但我从 Spark UI 注意到我的执行者在任何给定时间的最大活动任务数等于预期数量,即~8 代表 local[8],~40 代表 local[40],等等——看来并行化有效)。
默认情况下,我的数据集 RDD 的分区数是 4。我尝试将分区数强制设置为 20,但没有成功——事实上,它进一步减慢了 Lasso 算法...
我对缩放过程的预期不正确吗?有人可以帮我解决这个问题吗?
Is my expectation of the scaling process incorrect?
嗯,有点。我希望你不介意我用一点 Python 来证明我的观点。
大方地说几百万行实际上是一千万行。具有 40 000 000 个值(截距 + 3 个特征 + 每行标签),它提供了大约 380 MB 的数据(Java
Double
是 double-precision 64-bit IEEE 754 floating point)。让我们创建一些虚拟数据:import numpy as np n = 10 * 1000**2 X = np.random.uniform(size=(n, 4)) # Features y = np.random.uniform(size=(n, 1)) # Labels theta = np.random.uniform(size=(4, 1)) # Estimated parameters
梯度下降的每一步(因为
LassoWithSGD
的默认miniBatchFraction
是1.0所以不是真正随机的)忽略正则化需要这样操作。def step(X, y, theta): return ((X.dot(theta) - y) * X).sum(0)
那么让我们看看在本地处理我们的数据需要多长时间:
%timeit -n 15 step(X, y, theta) ## 15 loops, best of 3: 743 ms per loop
每步不到一秒,没有任何额外的优化。直觉上它非常快,匹配起来并不容易。只是为了好玩,让我们看看为这样的数据获得封闭形式的解决方案需要多少
%timeit -n 15 np.linalg.inv(X.transpose().dot(X)).dot(X.transpose()).dot(y) ## 15 loops, best of 3: 1.33 s per loop
现在让我们回到 Spark。可以并行计算单个点的残差。因此,当您增加并行处理的分区数量时,这是线性扩展的部分。
问题是你必须在本地聚合数据,序列化,传输到驱动程序,在本地反序列化和减少每一步后得到最终结果。然后你计算新的 theta,序列化发回等等。
所有这些都可以通过正确使用小批量和一些进一步的优化来改进,但最终你会受到整个系统延迟的限制。值得注意的是,当你在工作端增加并行度时,你也会增加必须在驱动程序上顺序执行的工作量,反之亦然。 Amdahl's law 会以某种方式咬你。
此外,以上所有内容都忽略了实际实施。
现在让我们进行另一个实验。首先是一些虚拟数据:
nCores = 8 # Number of cores on local machine I use for tests rdd = sc.parallelize([], nCores)
和基准:
%timeit -n 40 rdd.mapPartitions(lambda x: x).count() ## 40 loops, best of 3: 82.3 ms per loop
这意味着在没有任何实际处理或网络流量的情况下,如果有 8 个内核,我们将无法通过增加 Spark 中的并行度来做得更好(假设并行化的线性可扩展性,每个分区 743 毫秒/8 = 92.875 毫秒)部分)
以上总结:
- 如果可以使用梯度下降的封闭形式解决方案轻松地在本地处理数据,那只是浪费时间。如果你想增加并行度/减少延迟,你可以使用好的线性代数库
- Spark 旨在处理大量数据而不是减少延迟。如果您的数据适合几年前智能手机的内存,这是一个好兆头,这不是正确的工具
- 如果计算成本低廉,那么固定成本将成为限制因素
旁注:
- 通常来说,每台机器的内核数相对较多并不是最佳选择,除非您可以将其与 IO 吞吐量相匹配