在 pyspark 中超调逻辑回归管道模型

Hypertuning a logistic regression pipeline model in pyspark

我正在尝试超调逻辑回归模型。我一直收到错误 'label does not exist'。 这是一个收入分类器模型,其中标签是收入列。我曾尝试使用互联网上提供的解决方案来解决此问题,但我无法这样做,并且从过去 2 天开始一​​直坚持下去。如果有人能帮我找出我的错误,那将有很大帮助。

cat_cols=['workclass','education','marital_status','occupation','relationship','race','sex','native_country','income']

StringIndexer_workclass=StringIndexer(inputCol='workclass',outputCol='workclass_Ind')
StringIndexer_education=StringIndexer(inputCol='education',outputCol='education_Ind')
StringIndexer_marital_status=StringIndexer(inputCol='marital_status',outputCol='marital_status_Ind')
StringIndexer_occupation=StringIndexer(inputCol='occupation',outputCol='occupation_Ind')
StringIndexer_relationship=StringIndexer(inputCol='relationship',outputCol='relationship_Ind')
StringIndexer_race=StringIndexer(inputCol='race',outputCol='race_Ind')
StringIndexer_sex=StringIndexer(inputCol='sex',outputCol='sex_Ind')
StringIndexer_native_country=StringIndexer(inputCol='native_country',outputCol='native_country_Ind')

StringIndexer_income=StringIndexer(inputCol='income', outputCol='income_class')

OneHotEncoder_workclass=OneHotEncoder(inputCol='workclass_Ind',outputCol='workclass_feat')
OneHotEncoder_education=OneHotEncoder(inputCol='education_Ind',outputCol='education_feat')
OneHotEncoder_marital_status=OneHotEncoder(inputCol='marital_status_Ind',outputCol='marital_status_feat')
OneHotEncoder_occupation=OneHotEncoder(inputCol='occupation_Ind',outputCol='occupation_feat')
OneHotEncoder_relationship=OneHotEncoder(inputCol='relationship_Ind',outputCol='relationship_feat')
OneHotEncoder_race=OneHotEncoder(inputCol='race_Ind',outputCol='race_feat')
OneHotEncoder_sex=OneHotEncoder(inputCol='sex_Ind',outputCol='sex_feat')
OneHotEncoder_native_country = OneHotEncoder(inputCol='native_country_Ind', outputCol='native_country_feat')

num_cols= ['age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
for col in num_cols:
    data = data.withColumn(col, data[col].cast("integer"))

assemble_cols=['age','workclass_feat','fnlwgt','education_feat','education_num','marital_status_feat','occupation_feat','relationship_feat',
               'race_feat','sex_feat','capital_gain','capital_loss','hours_per_week','native_country_Ind']
assemble_features=VectorAssembler(inputCols=assemble_cols,outputCol='features')

# Generate  base pipeline
basePipeline=[StringIndexer_workclass, StringIndexer_education, StringIndexer_marital_status, StringIndexer_occupation, 
              StringIndexer_relationship, StringIndexer_race, StringIndexer_sex, StringIndexer_native_country,
              OneHotEncoder_workclass, OneHotEncoder_education, OneHotEncoder_marital_status, 
              OneHotEncoder_occupation, OneHotEncoder_relationship, OneHotEncoder_race, OneHotEncoder_sex, OneHotEncoder_native_country, 
              StringIndexer_income, assemble_features]

log_reg = LogisticRegression(maxIter=5, featuresCol='features', labelCol='income_class')
pl_log_reg = basePipeline + [log_reg]
log_reg_pipeline = Pipeline(stages=pl_log_reg)

training_data, testing_data = data.randomSplit([0.8, 0.2], 3)   
print("Number of training records: " + str(training_data.count()))
print("Number of testing records : " + str(testing_data.count()))
testing_data = testing_data.drop('income')


from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder()\
    .addGrid(log_reg.elasticNetParam,[0.2, 0.6])\
    .addGrid(log_reg.regParam, [0.1, 0.01])\
    .build()


crossval = CrossValidator().setEstimator(log_reg_pipeline).setEvaluator(BinaryClassificationEvaluator())\
                    .setEstimatorParamMaps(paramGrid).setNumFolds(2)  

# Run cross-validation, and choose the best set of parameters.
print(training_data.columns)
cvModel = crossval.fit(training_data)

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(testing_data)
selected = prediction.select("features",  "probability", "prediction")
for row in selected.collect():
    print(row)

没有超参数调整:

# fit the pipeline for the trained data
log_reg_model = log_reg_pipeline.fit(training_data)

# transform the data
testing_data = log_reg_model.transform(testing_data)

# view some of the columns generated
print(testing_data.columns)

testing_data.select("features",  "probability", "prediction").show(n=5)

+--------------------+--------------------+----------+
|            features|         probability|prediction|
+--------------------+--------------------+----------+
|(67,[0,3,10,21,27...|[0.98746926802062...|       0.0|
|(67,[0,3,10,16,27...|[0.96126191802708...|       0.0|
|(67,[0,3,10,21,27...|[0.98753467015818...|       0.0|
|(67,[0,3,10,16,27...|[0.97845137714609...|       0.0|
|(67,[0,3,10,16,27...|[0.97542170455525...|       0.0|
+--------------------+--------------------+----------+

经过大量的研究和发现,我终于找到了一个工作流水线模型。

cat_cols=['workclass','education','marital_status','occupation','relationship','race','sex','native_country','income']

StringIndexer_workclass=StringIndexer(inputCol='workclass',outputCol='workclass_Ind')
StringIndexer_education=StringIndexer(inputCol='education',outputCol='education_Ind')
StringIndexer_marital_status=StringIndexer(inputCol='marital_status',outputCol='marital_status_Ind')
StringIndexer_occupation=StringIndexer(inputCol='occupation',outputCol='occupation_Ind')
StringIndexer_relationship=StringIndexer(inputCol='relationship',outputCol='relationship_Ind')
StringIndexer_race=StringIndexer(inputCol='race',outputCol='race_Ind')
StringIndexer_sex=StringIndexer(inputCol='sex',outputCol='sex_Ind')
StringIndexer_native_country=StringIndexer(inputCol='native_country',outputCol='native_country_Ind')

StringIndexer_income=StringIndexer(inputCol='income', outputCol='income_class')

OneHotEncoder_workclass=OneHotEncoder(inputCol='workclass_Ind',outputCol='workclass_feat')
OneHotEncoder_education=OneHotEncoder(inputCol='education_Ind',outputCol='education_feat')
OneHotEncoder_marital_status=OneHotEncoder(inputCol='marital_status_Ind',outputCol='marital_status_feat')
OneHotEncoder_occupation=OneHotEncoder(inputCol='occupation_Ind',outputCol='occupation_feat')
OneHotEncoder_relationship=OneHotEncoder(inputCol='relationship_Ind',outputCol='relationship_feat')
OneHotEncoder_race=OneHotEncoder(inputCol='race_Ind',outputCol='race_feat')
OneHotEncoder_sex=OneHotEncoder(inputCol='sex_Ind',outputCol='sex_feat')
OneHotEncoder_native_country = OneHotEncoder(inputCol='native_country_Ind', outputCol='native_country_feat')

num_cols= ['age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
for col in num_cols:
    data = data.withColumn(col, data[col].cast("integer"))

pipeline=Pipeline(stages=[])

# Generate  base pipeline
basePipeline=[StringIndexer_workclass, StringIndexer_education, StringIndexer_marital_status, StringIndexer_occupation, 
              StringIndexer_relationship, StringIndexer_race, StringIndexer_sex, StringIndexer_native_country,
              OneHotEncoder_workclass, OneHotEncoder_education, OneHotEncoder_marital_status, 
              OneHotEncoder_occupation, OneHotEncoder_relationship, OneHotEncoder_race, OneHotEncoder_sex, OneHotEncoder_native_country, 
              StringIndexer_income, assemble_features]

pipeline = Pipeline(stages=basePipeline)
pipelineModel = pipeline.fit(data)
model = pipelineModel.transform(data)

model.take(1)

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["income_class"], DenseVector(x["features"])))

final_df = spark.createDataFrame(input_data, ["label", "features"])

final_df.show(2)

training_data, testing_data = final_df.randomSplit([0.8, 0.2], 3)  

log_reg = LogisticRegression(maxIter=5, featuresCol='features', labelCol='label')

paramGrid = ParamGridBuilder().addGrid(log_reg.regParam,[0.02,0.08])\
            .addGrid(log_reg.elasticNetParam,[0.2,0.6]).build()

evaluator_lr = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')

crossval = CrossValidator(estimator=log_reg,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator_lr,
                          numFolds=2)

start_time = time()

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training_data)

end_time = time()

training_time = end_time - start_time

print("The time taken to train the data is: %0.3f seconds" %training_time)

# Make predictions on testing data and calculating ROC metrics and model accuracy. 
prediction = cvModel.transform(testing_data)
output= prediction.select("features",  "probability", "prediction")

acc = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "accuracy"})
f1 = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "f1"})
weightedPrecision = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "weightedPrecision"})
weightedRecall = evaluator_lr.evaluate(output, {evaluator_lr.metricName: "weightedRecall"})
auc = evaluator_lr.evaluate(output)

print(acc)
print(f1)
print(weightedPrecision)
print(weightedRecall)
print(auc)