将稀疏特征向量分解为单独的列
Explode sparse features vector into separate columns
在我的 spark DataFrame 中,我有一列包含 CountVectoriser
转换的输出 - 它采用稀疏向量格式。我想做的是 'explode' 此列再次变成一个密集向量,然后是它的组件行(以便它可以用于外部模型的评分)。
我知道列中有 40 个特征,因此按照 示例,我尝试了:
import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vector
// convert sparse vector to a dense vector, and then to array<double>
val vecToSeq = udf((v: Vector) => v.toArray)
// Prepare a list of columns to create
val exprs = (0 until 39).map(i => $"_tmp".getItem(i).alias(s"exploded_col$i"))
testDF.select(vecToSeq($"features").alias("_tmp")).select(exprs:_*)
但是,我收到奇怪的错误(请参阅下面的完整错误):
data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;
现在看来 CountVectoriser 可能创建了一个 'ml.linalg.Vector,' 类型的向量,所以我也尝试导入:
import org.apache.spark.ml.linalg.{Vector, DenseVector, SparseVector}
然后我收到一个错误,原因是:
Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row
我还尝试通过将 UDF 更改为:
来转换 ml 向量
val vecToSeq = udf((v: Vector) => org.apache.spark.mllib.linalg.Vectors.fromML(v.toDense).toArray )
并得到类似的 cannot be cast to org.apache.spark.sql.Row
错误。谁能告诉我为什么这不起作用?有没有更简单的方法将 DataFrame 中的稀疏向量分解成单独的列?我在这上面花了好几个小时,还是想不通。
编辑:模式将特征列显示为向量:
|-- features: vector (nullable = true)
完整错误跟踪:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(features)' due to data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;;
Project [UDF(features#325) AS _tmp#463]
. . .
org.apache.spark.sql.cassandra.CassandraSourceRelation@47eae91d
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:93)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform.apply(QueryPlan.scala:293)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:293)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)
at uk.nominet.renewals.prediction_test$.prediction_test(prediction_test.scala:292)
at
在处理此类案例时,我通常会逐步分解以了解问题出在哪里。
首先,让我们设置一个数据框:
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.linalg.Vector
val df=sc.parallelize(Seq((1L, Seq("word1", "word2")))).toDF("id", "words")
val countModel = new CountVectorizer().setInputCol("words").setOutputCol("feature").fit(df)
val testDF = countModel.transform(df)
testDF.show
+---+--------------+-------------------+
| id| words| feature|
+---+--------------+-------------------+
| 1|[word1, word2]|(2,[0,1],[1.0,1.0])|
+---+--------------+-------------------+
现在,我想要的是select,说特征的第一列,也就是提取feature
向量的第一个坐标。
可以写成:v(0)
。
现在我希望我的数据框有一个包含 v(0)
的列,其中 v
是 feature
列的内容。我可以为此使用用户定义的函数:
val firstColumnExtractor = udf((v: Vector) => v(0))
然后我尝试将此列添加到我的 testDF
testDF.withColumn("feature_0", firstColumnExtractor($"feature")).show
+---+--------------+-------------------+---------+
| id| words| feature|feature_0|
+---+--------------+-------------------+---------+
| 1|[word1, word2]|(2,[0,1],[1.0,1.0])| 1.0|
+---+--------------+-------------------+---------+
请注意,我也可以这样做(据我所知,这只是风格问题):
testDF.select(firstColumnExtractor($"feature").as("feature_0")).show
这行得通,但需要重复很多工作。让我们自动化。
首先,我可以将提取函数概括为适用于任何索引。让我们创建一个高阶函数(创建函数的函数)
def columnExtractor(idx: Int) = udf((v: Vector) => v(idx))
现在,我可以重写前面的例子了:
testDF.withColumn("feature_0", columnExtractor(0)($"feature")).show
好的,现在我可以这样做了:
testDF.withColumn("feature_0", columnExtractor(0)($"feature"))
.withColumn("feature_1", columnExtractor(1)($"feature"))
这适用于 1 个维度,但是 39 个维度呢?好吧,让我们自动化一些。以上实际上是对每个维度的 foldLeft
操作:
(0 to 39).foldLeft(testDF)((df, idx) => df.withColumn("feature_"+idx, columnExtractor(idx)($"feature")))
这只是用多个 selects
编写函数的另一种方式
val featureCols = (0 to 1).map(idx => columnExtractor(idx)($"feature").as("feature_"+idx))
testDF.select((col("*") +: featureCols):_*).show
+---+--------------+-------------------+---------+---------+
| id| words| feature|feature_0|feature_1|
+---+--------------+-------------------+---------+---------+
| 1|[word1, word2]|(2,[0,1],[1.0,1.0])| 1.0| 1.0|
+---+--------------+-------------------+---------+---------+
现在,出于性能原因,您可能希望将基本 Vector 转换为坐标数组(或 DenseVector)。随意这样做。我觉得 DenseVector
或 Array
在性能方面可能非常接近,所以我会这样写:
// A function to densify the feature vector
val toDense = udf((v:Vector) => v.toDense)
// Replase testDF's feature column with its dense equivalent
val denseDF = testDF.withColumn("feature", toDense($"feature"))
// Work on denseDF as we did on testDF
denseDF.select((col("*") +: featureCols):_*).show
您的导入语句似乎有问题。如您所见,CountVectorizer
将使用 ml
包向量,因此,所有向量导入也应使用此包。确保您没有使用旧 mllib
的任何导入。这包括:
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.DenseVector
有些方法只存在于mllib
包中,所以如果你确实需要使用这种类型的向量,你可以重命名它们(因为名称与ml
否则为向量)。例如:
import org.apache.spark.mllib.linalg.{Vector => mllibVector}
修复所有导入后,您的代码应该 运行。测试:
val df = Seq((1L, Seq("word1", "word2", "word3")), (2L, Seq("word2", "word4"))).toDF("id", "words")
val countVec = new CountVectorizer().setInputCol("words").setOutputCol("features")
val testDF = countVec.fit(df).transform(df)
将给出一个测试数据框如下:
+---+--------------------+--------------------+
| id| words| features|
+---+--------------------+--------------------+
| 1|[word1, word2, wo...|(4,[0,2,3],[1.0,1...|
| 2| [word2, word4]| (4,[0,1],[1.0,1.0])|
+---+--------------------+--------------------+
现在给每个索引它自己的列:
val vecToSeq = udf((v: Vector) => v.toArray)
val exprs = (0 until 4).map(i => $"features".getItem(i).alias(s"exploded_col$i"))
val df2 = testDF.withColumn("features", vecToSeq($"features")).select(exprs:_*)
结果数据名:
+-------------+-------------+-------------+-------------+
|exploded_col0|exploded_col1|exploded_col2|exploded_col3|
+-------------+-------------+-------------+-------------+
| 1.0| 0.0| 1.0| 1.0|
| 1.0| 1.0| 0.0| 0.0|
+-------------+-------------+-------------+-------------+
在我的 spark DataFrame 中,我有一列包含 CountVectoriser
转换的输出 - 它采用稀疏向量格式。我想做的是 'explode' 此列再次变成一个密集向量,然后是它的组件行(以便它可以用于外部模型的评分)。
我知道列中有 40 个特征,因此按照
import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vector
// convert sparse vector to a dense vector, and then to array<double>
val vecToSeq = udf((v: Vector) => v.toArray)
// Prepare a list of columns to create
val exprs = (0 until 39).map(i => $"_tmp".getItem(i).alias(s"exploded_col$i"))
testDF.select(vecToSeq($"features").alias("_tmp")).select(exprs:_*)
但是,我收到奇怪的错误(请参阅下面的完整错误):
data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;
现在看来 CountVectoriser 可能创建了一个 'ml.linalg.Vector,' 类型的向量,所以我也尝试导入:
import org.apache.spark.ml.linalg.{Vector, DenseVector, SparseVector}
然后我收到一个错误,原因是:
Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row
我还尝试通过将 UDF 更改为:
来转换 ml 向量val vecToSeq = udf((v: Vector) => org.apache.spark.mllib.linalg.Vectors.fromML(v.toDense).toArray )
并得到类似的 cannot be cast to org.apache.spark.sql.Row
错误。谁能告诉我为什么这不起作用?有没有更简单的方法将 DataFrame 中的稀疏向量分解成单独的列?我在这上面花了好几个小时,还是想不通。
编辑:模式将特征列显示为向量:
|-- features: vector (nullable = true)
完整错误跟踪:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(features)' due to data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;;
Project [UDF(features#325) AS _tmp#463]
. . .
org.apache.spark.sql.cassandra.CassandraSourceRelation@47eae91d
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:93)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform.apply(QueryPlan.scala:293)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:293)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)
at uk.nominet.renewals.prediction_test$.prediction_test(prediction_test.scala:292)
at
在处理此类案例时,我通常会逐步分解以了解问题出在哪里。
首先,让我们设置一个数据框:
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.linalg.Vector
val df=sc.parallelize(Seq((1L, Seq("word1", "word2")))).toDF("id", "words")
val countModel = new CountVectorizer().setInputCol("words").setOutputCol("feature").fit(df)
val testDF = countModel.transform(df)
testDF.show
+---+--------------+-------------------+
| id| words| feature|
+---+--------------+-------------------+
| 1|[word1, word2]|(2,[0,1],[1.0,1.0])|
+---+--------------+-------------------+
现在,我想要的是select,说特征的第一列,也就是提取feature
向量的第一个坐标。
可以写成:v(0)
。
现在我希望我的数据框有一个包含 v(0)
的列,其中 v
是 feature
列的内容。我可以为此使用用户定义的函数:
val firstColumnExtractor = udf((v: Vector) => v(0))
然后我尝试将此列添加到我的 testDF
testDF.withColumn("feature_0", firstColumnExtractor($"feature")).show
+---+--------------+-------------------+---------+
| id| words| feature|feature_0|
+---+--------------+-------------------+---------+
| 1|[word1, word2]|(2,[0,1],[1.0,1.0])| 1.0|
+---+--------------+-------------------+---------+
请注意,我也可以这样做(据我所知,这只是风格问题):
testDF.select(firstColumnExtractor($"feature").as("feature_0")).show
这行得通,但需要重复很多工作。让我们自动化。 首先,我可以将提取函数概括为适用于任何索引。让我们创建一个高阶函数(创建函数的函数)
def columnExtractor(idx: Int) = udf((v: Vector) => v(idx))
现在,我可以重写前面的例子了:
testDF.withColumn("feature_0", columnExtractor(0)($"feature")).show
好的,现在我可以这样做了:
testDF.withColumn("feature_0", columnExtractor(0)($"feature"))
.withColumn("feature_1", columnExtractor(1)($"feature"))
这适用于 1 个维度,但是 39 个维度呢?好吧,让我们自动化一些。以上实际上是对每个维度的 foldLeft
操作:
(0 to 39).foldLeft(testDF)((df, idx) => df.withColumn("feature_"+idx, columnExtractor(idx)($"feature")))
这只是用多个 selects
编写函数的另一种方式val featureCols = (0 to 1).map(idx => columnExtractor(idx)($"feature").as("feature_"+idx))
testDF.select((col("*") +: featureCols):_*).show
+---+--------------+-------------------+---------+---------+
| id| words| feature|feature_0|feature_1|
+---+--------------+-------------------+---------+---------+
| 1|[word1, word2]|(2,[0,1],[1.0,1.0])| 1.0| 1.0|
+---+--------------+-------------------+---------+---------+
现在,出于性能原因,您可能希望将基本 Vector 转换为坐标数组(或 DenseVector)。随意这样做。我觉得 DenseVector
或 Array
在性能方面可能非常接近,所以我会这样写:
// A function to densify the feature vector
val toDense = udf((v:Vector) => v.toDense)
// Replase testDF's feature column with its dense equivalent
val denseDF = testDF.withColumn("feature", toDense($"feature"))
// Work on denseDF as we did on testDF
denseDF.select((col("*") +: featureCols):_*).show
您的导入语句似乎有问题。如您所见,CountVectorizer
将使用 ml
包向量,因此,所有向量导入也应使用此包。确保您没有使用旧 mllib
的任何导入。这包括:
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.DenseVector
有些方法只存在于mllib
包中,所以如果你确实需要使用这种类型的向量,你可以重命名它们(因为名称与ml
否则为向量)。例如:
import org.apache.spark.mllib.linalg.{Vector => mllibVector}
修复所有导入后,您的代码应该 运行。测试:
val df = Seq((1L, Seq("word1", "word2", "word3")), (2L, Seq("word2", "word4"))).toDF("id", "words")
val countVec = new CountVectorizer().setInputCol("words").setOutputCol("features")
val testDF = countVec.fit(df).transform(df)
将给出一个测试数据框如下:
+---+--------------------+--------------------+
| id| words| features|
+---+--------------------+--------------------+
| 1|[word1, word2, wo...|(4,[0,2,3],[1.0,1...|
| 2| [word2, word4]| (4,[0,1],[1.0,1.0])|
+---+--------------------+--------------------+
现在给每个索引它自己的列:
val vecToSeq = udf((v: Vector) => v.toArray)
val exprs = (0 until 4).map(i => $"features".getItem(i).alias(s"exploded_col$i"))
val df2 = testDF.withColumn("features", vecToSeq($"features")).select(exprs:_*)
结果数据名:
+-------------+-------------+-------------+-------------+
|exploded_col0|exploded_col1|exploded_col2|exploded_col3|
+-------------+-------------+-------------+-------------+
| 1.0| 0.0| 1.0| 1.0|
| 1.0| 1.0| 0.0| 0.0|
+-------------+-------------+-------------+-------------+