不使用所有节点的 Spark ML 梯度提升树

Spark ML gradient boosted trees not using all nodes

我正在使用 pyspark 中的 Spark ML GBTClassifier 在 AWS EMR 集群上具有约 400k 行和约 9k 列的数据帧上训练二进制分类模型。我正在将其与我当前的解决方案进行比较,即 运行 XGBoost 在一个巨大的 EC2 上,可以将整个数据帧放入内存。

我希望我可以在 Spark 中更快地训练(并获得新的观察结果),因为它将 distributed/parallel。但是,当(通过神经节)观察我的集群时,我看到只有 3-4 个节点处于活动状态 CPU,而其余节点只是坐在那里。事实上,从表面上看,它可能只使用 一个 节点进行实际训练。

我似乎无法在文档中找到有关节点限制或分区的任何内容,或者似乎与发生这种情况的原因相关的任何内容。也许我只是误解了算法的实现,但我假设它的实现方式可以并行化训练以利用 Spark 的 EMR/cluster 方面。如果不是,那么这样做与仅在单个 EC2 的内存中进行相比有什么优势吗?我想您不必将数据加载到内存中,但这并不是一个真正的优势。

这是我的代码的一些样板。感谢您的任何想法!

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Start Spark context:
sc = pyspark.SparkContext()
sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate()

# load data
df = sqlContext.sql('SELECT label, features FROM full_table WHERE train = 1')
df.cache()
print("training data loaded: {} rows".format(df.count()))

test_df = sqlContext.sql('SELECT label, features FROM full_table WHERE train = 0')
test_df.cache()
print("test data loaded: {} rows".format(test_df.count()))


#Create evaluator
evaluator = BinaryClassificationEvaluator()
evaluator.setRawPredictionCol('prob')
evaluator.setLabelCol('label')

# train model
gbt = GBTClassifier(maxIter=100, 
                    maxDepth=3, 
                    stepSize=0.1,
                    labelCol="label", 
                    seed=42)

model = gbt.fit(df)


# get predictions
gbt_preds = model.transform(test_df)
gbt_preds.show(10)


# evaluate predictions
getprob=udf(lambda v:float(v[1]),DoubleType())
preds = gbt_preds.withColumn('prob', getprob('probability'))\
        .drop('features', 'rawPrediction', 'probability', 'prediction')
preds.show(10)

auc = evaluator.evaluate(preds)
auc

旁注:我使用的表格已经矢量化。该模型使用此代码运行,它运行缓慢(训练约 10-15 分钟)并且仅使用 3-4 个(或可能仅一个)核心。

感谢上面的澄清评论。

Spark 的实现不一定比 XGBoost 快。事实上,我很期待你所看到的。

最大的因素是 XGBoost 是专门为梯度提升树设计和编写的。另一方面,Spark 的用途更广泛,并且很可能没有 XGBoost 所具有的相同类型的优化。请参阅 here 以了解 XGBoost 和 scikit-learn 的分类器算法实现之间的差异。如果您想真正了解细节,可以阅读论文,甚至阅读 XGBoost 和 Spark 实现背后的代码。

记住,XGBoost 也是 parallel/distributed。它只是在同一台机器上使用多个线程。当数据不适合单台机器时,Spark 可以帮助您 运行 算法。

我能想到的其他几个小问题是 a) Spark 确实有 non-trivial 启动时间。不同机器之间的通信也会累加起来。 b) XGBoost 是用 C++ 编写的,通常非常适合数值计算。

至于为什么Spark只使用3-4个核心,这取决于你的数据集大小,它是如何跨节点分布的,spark启动的执行者数量是多少,哪个阶段是占用大部分时间,内存配置等。您可以使用 Spark UI 来尝试弄清楚发生了什么。如果不查看数据集,很难说出为什么会发生这种情况。

希望对您有所帮助。

编辑:我刚刚发现这个很好的答案,它比较了简单 Spark 应用程序与独立 java 应用程序之间的执行时间 - 。同样的原则也适用于此,事实上更是如此,因为 XGBoost 是高度优化的。