带有 RandomForest 的 Spark ML Pipeline 在 20MB 数据集上花费的时间太长
Spark ML Pipeline with RandomForest takes too long on 20MB dataset
我正在使用 Spark ML 运行 进行一些 ML 实验,在 20MB 的小型数据集 (Poker dataset) 和带有参数网格的随机森林上,需要 1 小时 30 分钟才能完成.与 scikit-learn 类似,它花费的时间要少得多。
环境方面,我测试的是2个slave,每个15GB内存,24核。我认为它不应该花那么长时间,我想知道问题是否出在我的代码中,因为我对 Spark 还很陌生。
这里是:
df = pd.read_csv(http://archive.ics.uci.edu/ml/machine-learning-databases/poker/poker-hand-testing.data)
dataframe = sqlContext.createDataFrame(df)
train, test = dataframe.randomSplit([0.7, 0.3])
columnTypes = dataframe.dtypes
for ct in columnTypes:
if ct[1] == 'string' and ct[0] != 'label':
categoricalCols += [ct[0]]
elif ct[0] != 'label':
numericCols += [ct[0]]
stages = []
for categoricalCol in categoricalCols:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
stages += [stringIndexer]
assemblerInputs = map(lambda c: c + "Index", categoricalCols) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel', handleInvalid='skip')
stages += [labelIndexer]
estimator = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")
stages += [estimator]
parameters = {"maxDepth" : [3, 5, 10, 15], "maxBins" : [6, 12, 24, 32], "numTrees" : [3, 5, 10]}
paramGrid = ParamGridBuilder()
for key, value in parameters.iteritems():
paramGrid.addGrid(estimator.getParam(key), value)
estimatorParamMaps = (paramGrid.build())
pipeline = Pipeline(stages=stages)
crossValidator = CrossValidator(estimator=pipeline, estimatorParamMaps=estimatorParamMaps, evaluator=MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol='prediction', metricName='f1'), numFolds=3)
pipelineModel = crossValidator.fit(train)
predictions = pipelineModel.transform(test)
evaluator = pipeline.getEvaluator().evaluate(predictions)
提前致谢,非常感谢 comments/suggestions:)
以下内容可能无法完全解决您的问题,但应该可以为您提供一些入门指南。
您面临的第一个问题是数据量与资源不成比例。
这意味着由于您正在并行化本地集合(pandas 数据帧),Spark 将使用默认的并行配置。这很可能导致 48
个分区,每个分区少于 0.5mb
个。 (Spark 不能很好地处理小文件和小分区)
第二个问题与 Spark 中的树模型使用的昂贵optimizations/approximations技术有关。
Spark 树模型使用一些技巧来优化存储连续变量。对于小数据,只得到精确的分割要便宜得多。
在这种情况下,它主要使用近似分位数。
通常,在单机框架场景下,如scikit
,树模型使用连续特征的唯一特征值作为最佳拟合计算的分割候选。而在 Apache Spark 中,树模型使用每个特征的分位数作为分割候选。
还要补充一点,你也不应该忘记交叉验证是一项繁重而漫长的任务,因为它与你的 3 个超参数的组合乘以折叠数乘以训练每个模型所花费的时间成正比(网格搜索方法)。您可能希望一开始就根据示例缓存数据,但它仍然不会为您节省太多时间。我认为 spark 对于如此大的数据量来说有点矫枉过正。您可能想改用 scikit learn 并可能使用 spark-sklearn 进行分布式本地模型训练。
Spark 会假设数据是分布的和大的,分别和顺序地学习每个模型。
您当然可以使用基于列数据的文件格式(例如 parquet 和 tuning spark 本身等)来优化性能。这里讨论的范围太广了。
您可以在以下博文中阅读有关使用 spark-mllib 的树模型可伸缩性的更多信息:
我正在使用 Spark ML 运行 进行一些 ML 实验,在 20MB 的小型数据集 (Poker dataset) 和带有参数网格的随机森林上,需要 1 小时 30 分钟才能完成.与 scikit-learn 类似,它花费的时间要少得多。
环境方面,我测试的是2个slave,每个15GB内存,24核。我认为它不应该花那么长时间,我想知道问题是否出在我的代码中,因为我对 Spark 还很陌生。
这里是:
df = pd.read_csv(http://archive.ics.uci.edu/ml/machine-learning-databases/poker/poker-hand-testing.data)
dataframe = sqlContext.createDataFrame(df)
train, test = dataframe.randomSplit([0.7, 0.3])
columnTypes = dataframe.dtypes
for ct in columnTypes:
if ct[1] == 'string' and ct[0] != 'label':
categoricalCols += [ct[0]]
elif ct[0] != 'label':
numericCols += [ct[0]]
stages = []
for categoricalCol in categoricalCols:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
stages += [stringIndexer]
assemblerInputs = map(lambda c: c + "Index", categoricalCols) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel', handleInvalid='skip')
stages += [labelIndexer]
estimator = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")
stages += [estimator]
parameters = {"maxDepth" : [3, 5, 10, 15], "maxBins" : [6, 12, 24, 32], "numTrees" : [3, 5, 10]}
paramGrid = ParamGridBuilder()
for key, value in parameters.iteritems():
paramGrid.addGrid(estimator.getParam(key), value)
estimatorParamMaps = (paramGrid.build())
pipeline = Pipeline(stages=stages)
crossValidator = CrossValidator(estimator=pipeline, estimatorParamMaps=estimatorParamMaps, evaluator=MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol='prediction', metricName='f1'), numFolds=3)
pipelineModel = crossValidator.fit(train)
predictions = pipelineModel.transform(test)
evaluator = pipeline.getEvaluator().evaluate(predictions)
提前致谢,非常感谢 comments/suggestions:)
以下内容可能无法完全解决您的问题,但应该可以为您提供一些入门指南。
您面临的第一个问题是数据量与资源不成比例。
这意味着由于您正在并行化本地集合(pandas 数据帧),Spark 将使用默认的并行配置。这很可能导致 48
个分区,每个分区少于 0.5mb
个。 (Spark 不能很好地处理小文件和小分区)
第二个问题与 Spark 中的树模型使用的昂贵optimizations/approximations技术有关。
Spark 树模型使用一些技巧来优化存储连续变量。对于小数据,只得到精确的分割要便宜得多。 在这种情况下,它主要使用近似分位数。
通常,在单机框架场景下,如scikit
,树模型使用连续特征的唯一特征值作为最佳拟合计算的分割候选。而在 Apache Spark 中,树模型使用每个特征的分位数作为分割候选。
还要补充一点,你也不应该忘记交叉验证是一项繁重而漫长的任务,因为它与你的 3 个超参数的组合乘以折叠数乘以训练每个模型所花费的时间成正比(网格搜索方法)。您可能希望一开始就根据示例缓存数据,但它仍然不会为您节省太多时间。我认为 spark 对于如此大的数据量来说有点矫枉过正。您可能想改用 scikit learn 并可能使用 spark-sklearn 进行分布式本地模型训练。
Spark 会假设数据是分布的和大的,分别和顺序地学习每个模型。
您当然可以使用基于列数据的文件格式(例如 parquet 和 tuning spark 本身等)来优化性能。这里讨论的范围太广了。
您可以在以下博文中阅读有关使用 spark-mllib 的树模型可伸缩性的更多信息: