在带有字符串字段的 spark 中将数据框用于决策树分类器
Use dataframes for Decision tree classifier in spark with string fields
我已经设法让我的决策树分类器适用于基于 RDD 的 API,但现在我正尝试切换到 Spark 中基于数据帧的 API。
我有一个这样的数据集(但有更多字段):
国家、目的地、持续时间、标签
Belgium, France, 10, 0
Bosnia, USA, 120, 1
Germany, Spain, 30, 0
首先,我将我的 csv 文件加载到数据框中:
val data = session.read
.format("org.apache.spark.csv")
.option("header", "true")
.csv("/home/Datasets/data/dataset.csv")
然后我将字符串列转换为数字列
val stringColumns = Array("country", "destination")
val index_transformers = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
然后我 assemble 使用 VectorAssembler 将我所有的特征整合到一个向量中,就像这样:
val assembler = new VectorAssembler()
.setInputCols(Array("country_index", "destination_index", "duration_index"))
.setOutputCol("features")
我将我的数据分成训练和测试:
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
然后我创建我的 DecisionTree 分类器
val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
然后我使用管道进行所有转换
val pipeline = new Pipeline()
.setStages(Array(index_transformers, assembler, dt))
我训练我的模型并将其用于预测:
val model = pipeline.fit(trainingData)
val predictions = model.transform(testData)
但是我遇到了一些我不明白的错误:
当我 运行 我的代码是这样的时候,我有这个错误:
[error] found : Array[org.apache.spark.ml.feature.StringIndexer]
[error] required: org.apache.spark.ml.PipelineStage
[error] .setStages(Array(index_transformers, assembler,dt))
所以我所做的是,我在 index_transformers val 之后和 val assembler 之前添加了一个管道:
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(data)
val df_indexed = index_model.transform(data)
我使用新的 df_indexed 数据框作为训练集和测试集,并使用 assembler 和 dt
从管道中删除了 index_transformers
val Array(trainingData, testData) = df_indexed.randomSplit(Array(0.7, 0.3))
val pipeline = new Pipeline()
.setStages(Array(assembler,dt))
我得到这个错误:
Exception in thread "main" java.lang.IllegalArgumentException: Data type StringType is not supported.
它基本上说我在 String 上使用 VectorAssembler,而我告诉它在现在有一个数字 column_index 的 df_indexed 上使用它,但它似乎没有在 vectorAssembler 中使用它,我只是不明白..
谢谢
编辑
现在我几乎成功了:
val data = session.read
.format("org.apache.spark.csv")
.option("header", "true")
.csv("/home/hvfd8529/Datasets/dataOINIS/dataset.csv")
val stringColumns = Array("country_index", "destination_index", "duration_index")
val stringColumns_index = stringColumns.map(c => s"${c}_index")
val index_transformers = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
val assembler = new VectorAssembler()
.setInputCols(stringColumns_index)
.setOutputCol("features")
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
.setImpurity("entropy")
.setMaxBins(1000)
.setMaxDepth(15)
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels())
val stages = index_transformers :+ assembler :+ labelIndexer :+ dt :+ labelConverter
val pipeline = new Pipeline()
.setStages(stages)
// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "label", "indexedFeatures").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("accuracy = " + accuracy)
val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println("Learned classification tree model:\n" + treeModel.toDebugString)
除了现在我说这个时出错 :
value labels is not a member of org.apache.spark.ml.feature.StringIndexer
我不明白,因为我正在关注 spark doc 上的示例:/
应该是:
val pipeline = new Pipeline()
.setStages(index_transformers ++ Array(assembler, dt): Array[PipelineStage])
我对第一个问题做了什么:
val stages = index_transformers :+ assembler :+ labelIndexer :+ rf :+ labelConverter
val pipeline = new Pipeline()
.setStages(stages)
对于我的第二个标签问题,我需要像这样使用 .fit(data)
val labelIndexer = new StringIndexer()
.setInputCol("label_fraude")
.setOutputCol("indexedLabel")
.fit(data)
我已经设法让我的决策树分类器适用于基于 RDD 的 API,但现在我正尝试切换到 Spark 中基于数据帧的 API。
我有一个这样的数据集(但有更多字段):
国家、目的地、持续时间、标签
Belgium, France, 10, 0
Bosnia, USA, 120, 1
Germany, Spain, 30, 0
首先,我将我的 csv 文件加载到数据框中:
val data = session.read
.format("org.apache.spark.csv")
.option("header", "true")
.csv("/home/Datasets/data/dataset.csv")
然后我将字符串列转换为数字列
val stringColumns = Array("country", "destination")
val index_transformers = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
然后我 assemble 使用 VectorAssembler 将我所有的特征整合到一个向量中,就像这样:
val assembler = new VectorAssembler()
.setInputCols(Array("country_index", "destination_index", "duration_index"))
.setOutputCol("features")
我将我的数据分成训练和测试:
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
然后我创建我的 DecisionTree 分类器
val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
然后我使用管道进行所有转换
val pipeline = new Pipeline()
.setStages(Array(index_transformers, assembler, dt))
我训练我的模型并将其用于预测:
val model = pipeline.fit(trainingData)
val predictions = model.transform(testData)
但是我遇到了一些我不明白的错误:
当我 运行 我的代码是这样的时候,我有这个错误:
[error] found : Array[org.apache.spark.ml.feature.StringIndexer]
[error] required: org.apache.spark.ml.PipelineStage
[error] .setStages(Array(index_transformers, assembler,dt))
所以我所做的是,我在 index_transformers val 之后和 val assembler 之前添加了一个管道:
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(data)
val df_indexed = index_model.transform(data)
我使用新的 df_indexed 数据框作为训练集和测试集,并使用 assembler 和 dt
从管道中删除了 index_transformersval Array(trainingData, testData) = df_indexed.randomSplit(Array(0.7, 0.3))
val pipeline = new Pipeline()
.setStages(Array(assembler,dt))
我得到这个错误:
Exception in thread "main" java.lang.IllegalArgumentException: Data type StringType is not supported.
它基本上说我在 String 上使用 VectorAssembler,而我告诉它在现在有一个数字 column_index 的 df_indexed 上使用它,但它似乎没有在 vectorAssembler 中使用它,我只是不明白..
谢谢
编辑
现在我几乎成功了:
val data = session.read
.format("org.apache.spark.csv")
.option("header", "true")
.csv("/home/hvfd8529/Datasets/dataOINIS/dataset.csv")
val stringColumns = Array("country_index", "destination_index", "duration_index")
val stringColumns_index = stringColumns.map(c => s"${c}_index")
val index_transformers = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
val assembler = new VectorAssembler()
.setInputCols(stringColumns_index)
.setOutputCol("features")
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
.setImpurity("entropy")
.setMaxBins(1000)
.setMaxDepth(15)
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels())
val stages = index_transformers :+ assembler :+ labelIndexer :+ dt :+ labelConverter
val pipeline = new Pipeline()
.setStages(stages)
// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "label", "indexedFeatures").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("accuracy = " + accuracy)
val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println("Learned classification tree model:\n" + treeModel.toDebugString)
除了现在我说这个时出错 :
value labels is not a member of org.apache.spark.ml.feature.StringIndexer
我不明白,因为我正在关注 spark doc 上的示例:/
应该是:
val pipeline = new Pipeline()
.setStages(index_transformers ++ Array(assembler, dt): Array[PipelineStage])
我对第一个问题做了什么:
val stages = index_transformers :+ assembler :+ labelIndexer :+ rf :+ labelConverter
val pipeline = new Pipeline()
.setStages(stages)
对于我的第二个标签问题,我需要像这样使用 .fit(data)
val labelIndexer = new StringIndexer()
.setInputCol("label_fraude")
.setOutputCol("indexedLabel")
.fit(data)