Spark LinearRegressionSummary "normal" 摘要

Spark LinearRegressionSummary "normal" summary

根据 LinearRegressionSummary (Spark 2.1.0 JavaDoc),p 值仅适用于 "normal" 求解器。

This value is only available when using the "normal" solver.

"normal" 求解器是什么鬼?

我正在这样做:

import org.apache.spark.ml.{Pipeline, PipelineModel} 
import org.apache.spark.ml.evaluation.RegressionEvaluator 
import org.apache.spark.ml.feature.VectorAssembler 
import org.apache.spark.ml.regression.LinearRegressionModel 
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder} 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.{DataFrame, SparkSession}
    .
    .
    .
val (trainingData, testData): (DataFrame, DataFrame) = 
  com.acme.pta.accuracy.Util.splitData(output, testProportion)
    .
    .
    .
val lr = 
  new org.apache.spark.ml.regression.LinearRegression()
  .setSolver("normal").setMaxIter(maxIter)

val pipeline = new Pipeline()
  .setStages(Array(lr))

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.elasticNetParam, Array(0.2, 0.4, 0.8, 0.9))
  .addGrid(lr.regParam, Array(0,6, 0.3, 0.1, 0.01))
  .build()

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(numFolds) // Use 3+ in practice

val cvModel: CrossValidatorModel = cv.fit(trainingData)

val pipelineModel: PipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val lrModel: LinearRegressionModel = 
  pipelineModel.stages(0).asInstanceOf[LinearRegressionModel]

val modelSummary = lrModel.summary
Holder.log.info("lrModel.summary: " + modelSummary)
try {
  Holder.log.info("feature p values: ")
  // Exception occurs on line below.
  val featuresAndPValues = features.zip(lrModel.summary.pValues)
  featuresAndPValues.foreach(
    (featureAndPValue: (String, Double)) => 
    Holder.log.info(
      "feature: " + featureAndPValue._1 + ": " + featureAndPValue._2))
} catch {
  case _: java.lang.UnsupportedOperationException 
            => Holder.log.error("Cannot compute p-values")
}

我还在收到 UnsupportedOperationException

异常信息为:

No p-value available for this LinearRegressionModel

我还需要做些什么吗?我正在使用

  "org.apache.spark" %% "spark-mllib" % "2.1.1"

该版本是否支持 pValues?

已更新

tl;dr

解决方案 1

在正常情况下 LinearRegression pValues 和其他 "normal" 统计数据仅在参数 elasticNetParamregParam 之一为零时才会出现。所以你可以改变

.addGrid( lr.elasticNetParam, Array( 0.0 ) )

.addGrid( lr.regParam, Array( 0.0 ) )

解决方案 2

制作 LinearRegression 的自定义版本,明确使用

  1. "normal" 回归求解器。
  2. Cholesky WeightedLeastSquares.
  3. 求解器

我将这个 class 作为 ml.regression 包的扩展。

package org.apache.spark.ml.regression

import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.optim.WeightedLeastSquares
import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
import org.apache.spark.ml.util._
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._

class CholeskyLinearRegression ( override val uid: String )
    extends Regressor[ Vector, CholeskyLinearRegression, LinearRegressionModel ]
    with LinearRegressionParams with DefaultParamsWritable with Logging {

    import CholeskyLinearRegression._

    def this() = this(Identifiable.randomUID("linReg"))

    def setRegParam(value: Double): this.type = set(regParam, value)
    setDefault(regParam -> 0.0)

    def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value)
    setDefault(fitIntercept -> true)

    def setStandardization(value: Boolean): this.type = set(standardization, value)
    setDefault(standardization -> true)

    def setElasticNetParam(value: Double): this.type = set(elasticNetParam, value)
    setDefault(elasticNetParam -> 0.0)

    def setMaxIter(value: Int): this.type = set(maxIter, value)
    setDefault(maxIter -> 100)

    def setTol(value: Double): this.type = set(tol, value)
    setDefault(tol -> 1E-6)

    def setWeightCol(value: String): this.type = set(weightCol, value)

    def setSolver(value: String): this.type = set(solver, value)
    setDefault(solver -> Auto)

    def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
    setDefault(aggregationDepth -> 2)

    override protected def train(dataset: Dataset[_]): LinearRegressionModel = {

        // Extract the number of features before deciding optimization solver.
        val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
        val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))

        val instances: RDD[Instance] = 
            dataset
            .select( col( $(labelCol) ), w, col( $(featuresCol) ) )
            .rdd.map {
                case Row(label: Double, weight: Double, features: Vector) =>
                Instance(label, weight, features)
            }

        // if (($(solver) == Auto &&
        //   numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == Normal) {
        // For low dimensional data, WeightedLeastSquares is more efficient since the
        // training algorithm only requires one pass through the data. (SPARK-10668)

        val optimizer = new WeightedLeastSquares( 
            $(fitIntercept), 
            $(regParam),
            elasticNetParam = $(elasticNetParam), 
            $(standardization), 
            true,
            solverType = WeightedLeastSquares.Cholesky, 
            maxIter = $(maxIter), 
            tol = $(tol)
        )

        val model = optimizer.fit(instances)

        val lrModel = copyValues(new LinearRegressionModel(uid, model.coefficients, model.intercept))
        val (summaryModel, predictionColName) = lrModel.findSummaryModelAndPredictionCol()

        val trainingSummary = new LinearRegressionTrainingSummary(
            summaryModel.transform(dataset),
            predictionColName,
            $(labelCol),
            $(featuresCol),
            summaryModel,
            model.diagInvAtWA.toArray,
            model.objectiveHistory
        )

        lrModel
        .setSummary( Some( trainingSummary ) )

        lrModel
    }

    override def copy(extra: ParamMap): CholeskyLinearRegression = defaultCopy(extra)
}

object CholeskyLinearRegression 
    extends DefaultParamsReadable[CholeskyLinearRegression] {

    override def load(path: String): CholeskyLinearRegression = super.load(path)

    val MAX_FEATURES_FOR_NORMAL_SOLVER: Int = WeightedLeastSquares.MAX_NUM_FEATURES

    /** String name for "auto". */
    private[regression] val Auto = "auto"

    /** String name for "normal". */
    private[regression] val Normal = "normal"

    /** String name for "l-bfgs". */
    private[regression] val LBFGS = "l-bfgs"

    /** Set of solvers that LinearRegression supports. */
    private[regression] val supportedSolvers = Array(Auto, Normal, LBFGS)
}

你所要做的就是将它粘贴到项目中的单独文件中,并在你的代码中将 LinearRegression 更改为 CholeskyLinearRegression

val lr = new CholeskyLinearRegression() // new LinearRegression()
        .setSolver( "normal" )
        .setMaxIter( maxIter )

它使用非零参数并给出 pValues。在以下参数网格上测试。

val paramGrid = new ParamGridBuilder()
        .addGrid( lr.elasticNetParam, Array( 0.2, 0.4, 0.8, 0.9 ) )
        .addGrid( lr.regParam, Array( 0.6, 0.3, 0.1, 0.01 ) )
        .build()

全面调查

我最初认为主要问题是模型没有完全保留。在 CrossValidator 中拟合后不会保留经过训练的模型。这是可以理解的,因为内存消耗。 JIRA 中正在进行 debate on how should it be resolved. Issue

您可以在评论部分看到我试图从最佳模型中提取参数以便再次 运行 它。然后我发现模型摘要是可以的,它只是一些参数 diagInvAtWa 的长度为 1 并且基本上为零。

对于岭回归或 Tikhonov 正则化 (elasticNet = 0) 和任何 regParam pValues 和其他 "normal" 统计数据都可以计算,但对于 Lasso 方法和介于两者之间的东西(弹性网)则不能. regParam = 0 也是如此:计算了任何 elasticNet pValues。

这是为什么

LinearRegression uses Weighted Least Square optimizer for "normal" solver with solverType = WeightedLeastSquares.Auto. This optimizer has two options 对于求解器:QuasiNewtonCholesky。只有当 regParamelasticNetParam 都非零时才会选择前者。

val solver = if (
    ( solverType == WeightedLeastSquares.Auto && 
        elasticNetParam != 0.0 && 
        regParam != 0.0 ) ||
    ( solverType == WeightedLeastSquares.QuasiNewton ) ) {

    ...
    new QuasiNewtonSolver(fitIntercept, maxIter, tol, effectiveL1RegFun)
} else {
    new CholeskySolver
}

因此在您的参数网格中,将始终使用 QuasiNewtonSolver,因为没有 regParamelasticNetParam 的组合,其中一个为零。

我们知道,为了获得 pValues 和其他 "normal" 统计数据,例如 t-statistic 或 std。系数误差矩阵 (A^T * W * A)^-1 (diagInvAtWA) 的对角线不能是只有一个零的向量。此条件在 pValues.

的定义中设置

diagInvAtWA 是压缩上三角矩阵 (solution.aaInv) 的对角元素向量。

val diagInvAtWA = solution.aaInv.map { inv => ...

对于Cholesky solver它是calculated but for QuasiNewton not. Second parameter对于NormalEquationSolution就是这个矩阵。

从技术上讲,您可以使用

制作自己的 LinearRegression 版本

复制

在此示例中,我使用了来自 here 的数据 sample_linear_regression_data.txt

复制完整代码

import org.apache.spark._

import org.apache.spark.ml.{Pipeline, PipelineModel} 
import org.apache.spark.ml.evaluation.{RegressionEvaluator, BinaryClassificationEvaluator}
import org.apache.spark.ml.feature.VectorAssembler 
import org.apache.spark.ml.regression.{LinearRegressionModel, LinearRegression}
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder} 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.ml.param.ParamMap

object Main {

    def main( args: Array[ String ] ): Unit = {

        val spark =
            SparkSession
            .builder()
            .appName( "SO" )
            .master( "local[*]" )
            .config( "spark.driver.host", "localhost" )
            .getOrCreate()

        import spark.implicits._

        val data = 
            spark
            .read
            .format( "libsvm" )
            .load( "./sample_linear_regression_data.txt" )

        val Array( training, test ) = 
            data
            .randomSplit( Array( 0.9, 0.1 ), seed = 12345 )

        val maxIter = 10;

        val lr = new LinearRegression()
            .setSolver( "normal" )
            .setMaxIter( maxIter )

        val paramGrid = new ParamGridBuilder()
            // .addGrid( lr.elasticNetParam, Array( 0.2, 0.4, 0.8, 0.9 ) )
            .addGrid( lr.elasticNetParam, Array( 0.0 ) )
            .addGrid( lr.regParam, Array( 0.6, 0.3, 0.1, 0.01 ) )
            .build()

        val pipeline = new Pipeline()
            .setStages( Array( lr ) )

        val cv = new CrossValidator()
            .setEstimator( pipeline )
            .setEvaluator( new RegressionEvaluator )
            .setEstimatorParamMaps( paramGrid )
            .setNumFolds( 2 )  // Use 3+ in practice

        val cvModel = 
            cv
            .fit( training )

        val pipelineModel: PipelineModel = 
            cvModel
            .bestModel
            .asInstanceOf[ PipelineModel ]

        val lrModel: LinearRegressionModel = 
            pipelineModel
            .stages( 0 )
            .asInstanceOf[ LinearRegressionModel ]

        // Technically there is a way to use exact ParamMap
        // to build a new LR but for the simplicity I'll 
        // get and set them explicitly

        // lrModel.params.foreach( ( param ) => {

        //     println( param )
        // } )

        // val bestLr = new LinearRegression()
        //     .setSolver( "normal" )
        //     .setMaxIter( maxIter )
        //     .setRegParam( lrModel.getRegParam )
        //     .setElasticNetParam( lrModel.getElasticNetParam )

        // val bestLrModel = bestLr.fit( training )

        val modelSummary = 
            lrModel
            .summary

        println( "lrModel pValues: " + modelSummary.pValues.mkString( ", " ) )

        spark.stop()
    }
}

原创

求解器算法共有三种available:

  • l-bfgs - 有限内存 Broyden–Fletcher–Goldfarb–Shanno 算法,这是一种有限内存准牛顿优化 method.
  • normal - 使用 Normal Equation as an analytical solution to the linear regression problem. It is basically a weighted least squares approach or reweighted least squares approach.
  • auto - 自动选择求解器算法。将尽可能使用 Normal Equations 求解器,但这将在需要时自动回退到迭代优化方法

coefficientStandardErrorstValuespValues 仅在使用 "normal" 求解器时可用,因为它们都是基于 diagInvAtWA - 的对角线矩阵 (A^T * W * A)^-1.