在 pyspark 中超调逻辑回归管道模型
Hypertuning a logistic regression pipeline model in pyspark
我正在尝试超调逻辑回归模型。我一直收到错误 'label does not exist'。
这是一个收入分类器模型,其中标签是收入列。我曾尝试使用互联网上提供的解决方案来解决此问题,但我无法这样做,并且从过去 2 天开始一直坚持下去。如果有人能帮我找出我的错误,那将有很大帮助。
cat_cols=['workclass','education','marital_status','occupation','relationship','race','sex','native_country','income']
StringIndexer_workclass=StringIndexer(inputCol='workclass',outputCol='workclass_Ind')
StringIndexer_education=StringIndexer(inputCol='education',outputCol='education_Ind')
StringIndexer_marital_status=StringIndexer(inputCol='marital_status',outputCol='marital_status_Ind')
StringIndexer_occupation=StringIndexer(inputCol='occupation',outputCol='occupation_Ind')
StringIndexer_relationship=StringIndexer(inputCol='relationship',outputCol='relationship_Ind')
StringIndexer_race=StringIndexer(inputCol='race',outputCol='race_Ind')
StringIndexer_sex=StringIndexer(inputCol='sex',outputCol='sex_Ind')
StringIndexer_native_country=StringIndexer(inputCol='native_country',outputCol='native_country_Ind')
StringIndexer_income=StringIndexer(inputCol='income', outputCol='income_class')
OneHotEncoder_workclass=OneHotEncoder(inputCol='workclass_Ind',outputCol='workclass_feat')
OneHotEncoder_education=OneHotEncoder(inputCol='education_Ind',outputCol='education_feat')
OneHotEncoder_marital_status=OneHotEncoder(inputCol='marital_status_Ind',outputCol='marital_status_feat')
OneHotEncoder_occupation=OneHotEncoder(inputCol='occupation_Ind',outputCol='occupation_feat')
OneHotEncoder_relationship=OneHotEncoder(inputCol='relationship_Ind',outputCol='relationship_feat')
OneHotEncoder_race=OneHotEncoder(inputCol='race_Ind',outputCol='race_feat')
OneHotEncoder_sex=OneHotEncoder(inputCol='sex_Ind',outputCol='sex_feat')
OneHotEncoder_native_country = OneHotEncoder(inputCol='native_country_Ind', outputCol='native_country_feat')
num_cols= ['age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
for col in num_cols:
data = data.withColumn(col, data[col].cast("integer"))
assemble_cols=['age','workclass_feat','fnlwgt','education_feat','education_num','marital_status_feat','occupation_feat','relationship_feat',
'race_feat','sex_feat','capital_gain','capital_loss','hours_per_week','native_country_Ind']
assemble_features=VectorAssembler(inputCols=assemble_cols,outputCol='features')
# Generate base pipeline
basePipeline=[StringIndexer_workclass, StringIndexer_education, StringIndexer_marital_status, StringIndexer_occupation,
StringIndexer_relationship, StringIndexer_race, StringIndexer_sex, StringIndexer_native_country,
OneHotEncoder_workclass, OneHotEncoder_education, OneHotEncoder_marital_status,
OneHotEncoder_occupation, OneHotEncoder_relationship, OneHotEncoder_race, OneHotEncoder_sex, OneHotEncoder_native_country,
StringIndexer_income, assemble_features]
log_reg = LogisticRegression(maxIter=5, featuresCol='features', labelCol='income_class')
pl_log_reg = basePipeline + [log_reg]
log_reg_pipeline = Pipeline(stages=pl_log_reg)
training_data, testing_data = data.randomSplit([0.8, 0.2], 3)
print("Number of training records: " + str(training_data.count()))
print("Number of testing records : " + str(testing_data.count()))
testing_data = testing_data.drop('income')
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder()\
.addGrid(log_reg.elasticNetParam,[0.2, 0.6])\
.addGrid(log_reg.regParam, [0.1, 0.01])\
.build()
crossval = CrossValidator().setEstimator(log_reg_pipeline).setEvaluator(BinaryClassificationEvaluator())\
.setEstimatorParamMaps(paramGrid).setNumFolds(2)
# Run cross-validation, and choose the best set of parameters.
print(training_data.columns)
cvModel = crossval.fit(training_data)
# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(testing_data)
selected = prediction.select("features", "probability", "prediction")
for row in selected.collect():
print(row)
没有超参数调整:
# fit the pipeline for the trained data
log_reg_model = log_reg_pipeline.fit(training_data)
# transform the data
testing_data = log_reg_model.transform(testing_data)
# view some of the columns generated
print(testing_data.columns)
testing_data.select("features", "probability", "prediction").show(n=5)
+--------------------+--------------------+----------+
| features| probability|prediction|
+--------------------+--------------------+----------+
|(67,[0,3,10,21,27...|[0.98746926802062...| 0.0|
|(67,[0,3,10,16,27...|[0.96126191802708...| 0.0|
|(67,[0,3,10,21,27...|[0.98753467015818...| 0.0|
|(67,[0,3,10,16,27...|[0.97845137714609...| 0.0|
|(67,[0,3,10,16,27...|[0.97542170455525...| 0.0|
+--------------------+--------------------+----------+
经过大量的研究和发现,我终于找到了一个工作流水线模型。
cat_cols=['workclass','education','marital_status','occupation','relationship','race','sex','native_country','income']
StringIndexer_workclass=StringIndexer(inputCol='workclass',outputCol='workclass_Ind')
StringIndexer_education=StringIndexer(inputCol='education',outputCol='education_Ind')
StringIndexer_marital_status=StringIndexer(inputCol='marital_status',outputCol='marital_status_Ind')
StringIndexer_occupation=StringIndexer(inputCol='occupation',outputCol='occupation_Ind')
StringIndexer_relationship=StringIndexer(inputCol='relationship',outputCol='relationship_Ind')
StringIndexer_race=StringIndexer(inputCol='race',outputCol='race_Ind')
StringIndexer_sex=StringIndexer(inputCol='sex',outputCol='sex_Ind')
StringIndexer_native_country=StringIndexer(inputCol='native_country',outputCol='native_country_Ind')
StringIndexer_income=StringIndexer(inputCol='income', outputCol='income_class')
OneHotEncoder_workclass=OneHotEncoder(inputCol='workclass_Ind',outputCol='workclass_feat')
OneHotEncoder_education=OneHotEncoder(inputCol='education_Ind',outputCol='education_feat')
OneHotEncoder_marital_status=OneHotEncoder(inputCol='marital_status_Ind',outputCol='marital_status_feat')
OneHotEncoder_occupation=OneHotEncoder(inputCol='occupation_Ind',outputCol='occupation_feat')
OneHotEncoder_relationship=OneHotEncoder(inputCol='relationship_Ind',outputCol='relationship_feat')
OneHotEncoder_race=OneHotEncoder(inputCol='race_Ind',outputCol='race_feat')
OneHotEncoder_sex=OneHotEncoder(inputCol='sex_Ind',outputCol='sex_feat')
OneHotEncoder_native_country = OneHotEncoder(inputCol='native_country_Ind', outputCol='native_country_feat')
num_cols= ['age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
for col in num_cols:
data = data.withColumn(col, data[col].cast("integer"))
pipeline=Pipeline(stages=[])
# Generate base pipeline
basePipeline=[StringIndexer_workclass, StringIndexer_education, StringIndexer_marital_status, StringIndexer_occupation,
StringIndexer_relationship, StringIndexer_race, StringIndexer_sex, StringIndexer_native_country,
OneHotEncoder_workclass, OneHotEncoder_education, OneHotEncoder_marital_status,
OneHotEncoder_occupation, OneHotEncoder_relationship, OneHotEncoder_race, OneHotEncoder_sex, OneHotEncoder_native_country,
StringIndexer_income, assemble_features]
pipeline = Pipeline(stages=basePipeline)
pipelineModel = pipeline.fit(data)
model = pipelineModel.transform(data)
model.take(1)
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["income_class"], DenseVector(x["features"])))
final_df = spark.createDataFrame(input_data, ["label", "features"])
final_df.show(2)
training_data, testing_data = final_df.randomSplit([0.8, 0.2], 3)
log_reg = LogisticRegression(maxIter=5, featuresCol='features', labelCol='label')
paramGrid = ParamGridBuilder().addGrid(log_reg.regParam,[0.02,0.08])\
.addGrid(log_reg.elasticNetParam,[0.2,0.6]).build()
evaluator_lr = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
crossval = CrossValidator(estimator=log_reg,
estimatorParamMaps=paramGrid,
evaluator=evaluator_lr,
numFolds=2)
start_time = time()
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training_data)
end_time = time()
training_time = end_time - start_time
print("The time taken to train the data is: %0.3f seconds" %training_time)
# Make predictions on testing data and calculating ROC metrics and model accuracy.
prediction = cvModel.transform(testing_data)
output= prediction.select("features", "probability", "prediction")
acc = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "accuracy"})
f1 = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "f1"})
weightedPrecision = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "weightedPrecision"})
weightedRecall = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "weightedRecall"})
auc = evaluator_lr.evaluate(output)
print(acc)
print(f1)
print(weightedPrecision)
print(weightedRecall)
print(auc)
我正在尝试超调逻辑回归模型。我一直收到错误 'label does not exist'。 这是一个收入分类器模型,其中标签是收入列。我曾尝试使用互联网上提供的解决方案来解决此问题,但我无法这样做,并且从过去 2 天开始一直坚持下去。如果有人能帮我找出我的错误,那将有很大帮助。
cat_cols=['workclass','education','marital_status','occupation','relationship','race','sex','native_country','income']
StringIndexer_workclass=StringIndexer(inputCol='workclass',outputCol='workclass_Ind')
StringIndexer_education=StringIndexer(inputCol='education',outputCol='education_Ind')
StringIndexer_marital_status=StringIndexer(inputCol='marital_status',outputCol='marital_status_Ind')
StringIndexer_occupation=StringIndexer(inputCol='occupation',outputCol='occupation_Ind')
StringIndexer_relationship=StringIndexer(inputCol='relationship',outputCol='relationship_Ind')
StringIndexer_race=StringIndexer(inputCol='race',outputCol='race_Ind')
StringIndexer_sex=StringIndexer(inputCol='sex',outputCol='sex_Ind')
StringIndexer_native_country=StringIndexer(inputCol='native_country',outputCol='native_country_Ind')
StringIndexer_income=StringIndexer(inputCol='income', outputCol='income_class')
OneHotEncoder_workclass=OneHotEncoder(inputCol='workclass_Ind',outputCol='workclass_feat')
OneHotEncoder_education=OneHotEncoder(inputCol='education_Ind',outputCol='education_feat')
OneHotEncoder_marital_status=OneHotEncoder(inputCol='marital_status_Ind',outputCol='marital_status_feat')
OneHotEncoder_occupation=OneHotEncoder(inputCol='occupation_Ind',outputCol='occupation_feat')
OneHotEncoder_relationship=OneHotEncoder(inputCol='relationship_Ind',outputCol='relationship_feat')
OneHotEncoder_race=OneHotEncoder(inputCol='race_Ind',outputCol='race_feat')
OneHotEncoder_sex=OneHotEncoder(inputCol='sex_Ind',outputCol='sex_feat')
OneHotEncoder_native_country = OneHotEncoder(inputCol='native_country_Ind', outputCol='native_country_feat')
num_cols= ['age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
for col in num_cols:
data = data.withColumn(col, data[col].cast("integer"))
assemble_cols=['age','workclass_feat','fnlwgt','education_feat','education_num','marital_status_feat','occupation_feat','relationship_feat',
'race_feat','sex_feat','capital_gain','capital_loss','hours_per_week','native_country_Ind']
assemble_features=VectorAssembler(inputCols=assemble_cols,outputCol='features')
# Generate base pipeline
basePipeline=[StringIndexer_workclass, StringIndexer_education, StringIndexer_marital_status, StringIndexer_occupation,
StringIndexer_relationship, StringIndexer_race, StringIndexer_sex, StringIndexer_native_country,
OneHotEncoder_workclass, OneHotEncoder_education, OneHotEncoder_marital_status,
OneHotEncoder_occupation, OneHotEncoder_relationship, OneHotEncoder_race, OneHotEncoder_sex, OneHotEncoder_native_country,
StringIndexer_income, assemble_features]
log_reg = LogisticRegression(maxIter=5, featuresCol='features', labelCol='income_class')
pl_log_reg = basePipeline + [log_reg]
log_reg_pipeline = Pipeline(stages=pl_log_reg)
training_data, testing_data = data.randomSplit([0.8, 0.2], 3)
print("Number of training records: " + str(training_data.count()))
print("Number of testing records : " + str(testing_data.count()))
testing_data = testing_data.drop('income')
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder()\
.addGrid(log_reg.elasticNetParam,[0.2, 0.6])\
.addGrid(log_reg.regParam, [0.1, 0.01])\
.build()
crossval = CrossValidator().setEstimator(log_reg_pipeline).setEvaluator(BinaryClassificationEvaluator())\
.setEstimatorParamMaps(paramGrid).setNumFolds(2)
# Run cross-validation, and choose the best set of parameters.
print(training_data.columns)
cvModel = crossval.fit(training_data)
# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(testing_data)
selected = prediction.select("features", "probability", "prediction")
for row in selected.collect():
print(row)
没有超参数调整:
# fit the pipeline for the trained data
log_reg_model = log_reg_pipeline.fit(training_data)
# transform the data
testing_data = log_reg_model.transform(testing_data)
# view some of the columns generated
print(testing_data.columns)
testing_data.select("features", "probability", "prediction").show(n=5)
+--------------------+--------------------+----------+
| features| probability|prediction|
+--------------------+--------------------+----------+
|(67,[0,3,10,21,27...|[0.98746926802062...| 0.0|
|(67,[0,3,10,16,27...|[0.96126191802708...| 0.0|
|(67,[0,3,10,21,27...|[0.98753467015818...| 0.0|
|(67,[0,3,10,16,27...|[0.97845137714609...| 0.0|
|(67,[0,3,10,16,27...|[0.97542170455525...| 0.0|
+--------------------+--------------------+----------+
经过大量的研究和发现,我终于找到了一个工作流水线模型。
cat_cols=['workclass','education','marital_status','occupation','relationship','race','sex','native_country','income']
StringIndexer_workclass=StringIndexer(inputCol='workclass',outputCol='workclass_Ind')
StringIndexer_education=StringIndexer(inputCol='education',outputCol='education_Ind')
StringIndexer_marital_status=StringIndexer(inputCol='marital_status',outputCol='marital_status_Ind')
StringIndexer_occupation=StringIndexer(inputCol='occupation',outputCol='occupation_Ind')
StringIndexer_relationship=StringIndexer(inputCol='relationship',outputCol='relationship_Ind')
StringIndexer_race=StringIndexer(inputCol='race',outputCol='race_Ind')
StringIndexer_sex=StringIndexer(inputCol='sex',outputCol='sex_Ind')
StringIndexer_native_country=StringIndexer(inputCol='native_country',outputCol='native_country_Ind')
StringIndexer_income=StringIndexer(inputCol='income', outputCol='income_class')
OneHotEncoder_workclass=OneHotEncoder(inputCol='workclass_Ind',outputCol='workclass_feat')
OneHotEncoder_education=OneHotEncoder(inputCol='education_Ind',outputCol='education_feat')
OneHotEncoder_marital_status=OneHotEncoder(inputCol='marital_status_Ind',outputCol='marital_status_feat')
OneHotEncoder_occupation=OneHotEncoder(inputCol='occupation_Ind',outputCol='occupation_feat')
OneHotEncoder_relationship=OneHotEncoder(inputCol='relationship_Ind',outputCol='relationship_feat')
OneHotEncoder_race=OneHotEncoder(inputCol='race_Ind',outputCol='race_feat')
OneHotEncoder_sex=OneHotEncoder(inputCol='sex_Ind',outputCol='sex_feat')
OneHotEncoder_native_country = OneHotEncoder(inputCol='native_country_Ind', outputCol='native_country_feat')
num_cols= ['age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
for col in num_cols:
data = data.withColumn(col, data[col].cast("integer"))
pipeline=Pipeline(stages=[])
# Generate base pipeline
basePipeline=[StringIndexer_workclass, StringIndexer_education, StringIndexer_marital_status, StringIndexer_occupation,
StringIndexer_relationship, StringIndexer_race, StringIndexer_sex, StringIndexer_native_country,
OneHotEncoder_workclass, OneHotEncoder_education, OneHotEncoder_marital_status,
OneHotEncoder_occupation, OneHotEncoder_relationship, OneHotEncoder_race, OneHotEncoder_sex, OneHotEncoder_native_country,
StringIndexer_income, assemble_features]
pipeline = Pipeline(stages=basePipeline)
pipelineModel = pipeline.fit(data)
model = pipelineModel.transform(data)
model.take(1)
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["income_class"], DenseVector(x["features"])))
final_df = spark.createDataFrame(input_data, ["label", "features"])
final_df.show(2)
training_data, testing_data = final_df.randomSplit([0.8, 0.2], 3)
log_reg = LogisticRegression(maxIter=5, featuresCol='features', labelCol='label')
paramGrid = ParamGridBuilder().addGrid(log_reg.regParam,[0.02,0.08])\
.addGrid(log_reg.elasticNetParam,[0.2,0.6]).build()
evaluator_lr = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
crossval = CrossValidator(estimator=log_reg,
estimatorParamMaps=paramGrid,
evaluator=evaluator_lr,
numFolds=2)
start_time = time()
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training_data)
end_time = time()
training_time = end_time - start_time
print("The time taken to train the data is: %0.3f seconds" %training_time)
# Make predictions on testing data and calculating ROC metrics and model accuracy.
prediction = cvModel.transform(testing_data)
output= prediction.select("features", "probability", "prediction")
acc = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "accuracy"})
f1 = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "f1"})
weightedPrecision = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "weightedPrecision"})
weightedRecall = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "weightedRecall"})
auc = evaluator_lr.evaluate(output)
print(acc)
print(f1)
print(weightedPrecision)
print(weightedRecall)
print(auc)