Spark DataFrame 不尊重模式并将所有内容都视为字符串
Spark DataFrame not respecting schema and considering everything as String
我遇到了一个问题,我已经好久没解决了。
我正在使用 Spark 1.4 和 Scala 2.10。我现在无法升级(大型分布式基础设施)
我有一个包含几百列的文件,其中只有 2 列是字符串,其余都是长列。我想将此数据转换为 Label/Features 数据帧。
我已经能够将其转换为 LibSVM 格式。
我无法将其转换为 Label/Features 格式。
原因是
我无法使用此处所示的 toDF()
https://spark.apache.org/docs/1.5.1/ml-ensembles.html
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
它在 1.4 中不受支持
所以我首先将 txtFile 转换为 DataFrame,我使用了类似这样的东西
def getColumnDType(columnName:String):StructField = {
if((columnName== "strcol1") || (columnName== "strcol2"))
return StructField(columnName, StringType, false)
else
return StructField(columnName, LongType, false)
}
def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = {
val sfRDD = sc.textFile(staticfeatures_filepath)//
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// reads a space delimited string from application.properties file
val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("")
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
val data = sfRDD
.map(line => line.split(","))
.map(p => Row.fromSeq(p.toSeq))
var df = sqlContext.createDataFrame(data, schema)
//schemaString.split(" ").drop(4)
//.map(s => df = convertColumn(df, s, "int"))
return df
}
当我对这个返回的数据帧执行 df.na.drop() df.printSchema()
时,我得到了完美的 Schema Like this
root
|-- rand_entry: long (nullable = false)
|-- strcol1: string (nullable = false)
|-- label: long (nullable = false)
|-- strcol2: string (nullable = false)
|-- f1: long (nullable = false)
|-- f2: long (nullable = false)
|-- f3: long (nullable = false)
and so on till around f300
但是 - 可悲的是无论我尝试用 df 做什么(见下文),我总是收到 ClassCastException(java.lang.String 无法转换为 java.lang.Long)
val featureColumns = Array("f1","f2",....."f300")
assertEquals(-99,df.select("f1").head().getLong(0))
assertEquals(-99,df.first().get(4))
val transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
所以 - 不好的是 - 即使架构显示 Long - df 仍在内部将所有内容视为字符串。
编辑
添加一个简单的例子
假设我有这样一个文件
1,A,20,P,-99,1,0,0,8,1,1,1,1,131153
1,B,23,P,-99,0,1,0,7,1,1,0,1,65543
1,C,24,P,-99,0,1,0,9,1,1,1,1,262149
1,D,7,P,-99,0,0,0,8,1,1,1,1,458759
和
sf-schema=f0 strCol1 f1 strCol2 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11
(列名真的不重要所以你可以忽略这个细节)
我想要做的就是创建一个 Label/Features 类型的数据框,其中我的第 3 列成为标签,第 5 到第 11 列成为特征 [Vector] 列。这样我就可以按照 https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles 中的其余步骤进行操作。
我也按照 zero323
的建议投射了这些专栏
val types = Map("strCol1" -> "string", "strCol2" -> "string")
.withDefault(_ => "bigint")
df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*)
df = df.drop("f0")
df = df.drop("strCol1")
df = df.drop("strCol2")
但是断言和VectorAssembler 仍然失败。
featureColumns = Array("f2","f3",....."f11")
这是我在 df
之后想要做的整个序列
var transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
transformeddf.show(2)
transformeddf = new StringIndexer()
.setInputCol("f1")
.setOutputCol("indexedF1")
.fit(transformeddf)
.transform(transformeddf)
transformeddf.show(2)
transformeddf = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(5)
.fit(transformeddf)
.transform(transformeddf)
来自 ScalaIDE 的异常跟踪 - 就在它命中 VectorAssembler 时如下
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval.apply(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval.apply(complexTypes.scala:75)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun.apply(ScalaUdf.scala:72)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun.apply(ScalaUdf.scala:70)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:143)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
你得到 ClassCastException
因为这正是应该发生的事情。模式参数不用于自动转换(一些 DataSources
可能会以这种方式使用模式,但不是像 createDataFrame
这样的方法)。它只声明存储在行中的值的类型。您有责任传递与模式匹配的数据,而不是相反。
虽然 DataFrame
显示您声明的架构仅在运行时验证,因此运行时 exception.If 您希望将数据转换为您明确拥有 cast
数据的特定数据。
首先读取所有列为StringType
:
val rows = sc.textFile(staticfeatures_filepath)
.map(line => Row.fromSeq(line.split(",")))
val schema = StructType(
schemaString.split(" ").map(
columnName => StructField(columnName, StringType, false)
)
)
val df = sqlContext.createDataFrame(rows, schema)
接下来将选定的列转换为所需的类型:
import org.apache.spark.sql.types.{LongType, StringType}
val types = Map("strcol1" -> StringType, "strcol2" -> StringType)
.withDefault(_ => LongType)
val casted = df.select(df.columns.map(c => col(c).cast(types(c)).alias(c)): _*)
使用汇编程序:
val transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(casted)
您可以使用 spark-csv
:
简单地执行第 1 步和第 2 步
// As originally
val schema = StructType(
schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
val df = sqlContext
.read.schema(schema)
.format("com.databricks.spark.csv")
.option("header", "false")
.load(staticfeatures_filepath)
另见
我遇到了一个问题,我已经好久没解决了。
我正在使用 Spark 1.4 和 Scala 2.10。我现在无法升级(大型分布式基础设施)
我有一个包含几百列的文件,其中只有 2 列是字符串,其余都是长列。我想将此数据转换为 Label/Features 数据帧。
我已经能够将其转换为 LibSVM 格式。
我无法将其转换为 Label/Features 格式。
原因是
我无法使用此处所示的 toDF() https://spark.apache.org/docs/1.5.1/ml-ensembles.html
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
它在 1.4 中不受支持
所以我首先将 txtFile 转换为 DataFrame,我使用了类似这样的东西
def getColumnDType(columnName:String):StructField = { if((columnName== "strcol1") || (columnName== "strcol2")) return StructField(columnName, StringType, false) else return StructField(columnName, LongType, false) } def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = { val sfRDD = sc.textFile(staticfeatures_filepath)// val sqlContext = new org.apache.spark.sql.SQLContext(sc) // reads a space delimited string from application.properties file val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("") // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName))) val data = sfRDD .map(line => line.split(",")) .map(p => Row.fromSeq(p.toSeq)) var df = sqlContext.createDataFrame(data, schema) //schemaString.split(" ").drop(4) //.map(s => df = convertColumn(df, s, "int")) return df }
当我对这个返回的数据帧执行 df.na.drop() df.printSchema()
时,我得到了完美的 Schema Like this
root
|-- rand_entry: long (nullable = false)
|-- strcol1: string (nullable = false)
|-- label: long (nullable = false)
|-- strcol2: string (nullable = false)
|-- f1: long (nullable = false)
|-- f2: long (nullable = false)
|-- f3: long (nullable = false)
and so on till around f300
但是 - 可悲的是无论我尝试用 df 做什么(见下文),我总是收到 ClassCastException(java.lang.String 无法转换为 java.lang.Long)
val featureColumns = Array("f1","f2",....."f300")
assertEquals(-99,df.select("f1").head().getLong(0))
assertEquals(-99,df.first().get(4))
val transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
所以 - 不好的是 - 即使架构显示 Long - df 仍在内部将所有内容视为字符串。
编辑
添加一个简单的例子
假设我有这样一个文件
1,A,20,P,-99,1,0,0,8,1,1,1,1,131153
1,B,23,P,-99,0,1,0,7,1,1,0,1,65543
1,C,24,P,-99,0,1,0,9,1,1,1,1,262149
1,D,7,P,-99,0,0,0,8,1,1,1,1,458759
和
sf-schema=f0 strCol1 f1 strCol2 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11
(列名真的不重要所以你可以忽略这个细节)
我想要做的就是创建一个 Label/Features 类型的数据框,其中我的第 3 列成为标签,第 5 到第 11 列成为特征 [Vector] 列。这样我就可以按照 https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles 中的其余步骤进行操作。
我也按照 zero323
的建议投射了这些专栏val types = Map("strCol1" -> "string", "strCol2" -> "string")
.withDefault(_ => "bigint")
df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*)
df = df.drop("f0")
df = df.drop("strCol1")
df = df.drop("strCol2")
但是断言和VectorAssembler 仍然失败。 featureColumns = Array("f2","f3",....."f11") 这是我在 df
之后想要做的整个序列 var transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
transformeddf.show(2)
transformeddf = new StringIndexer()
.setInputCol("f1")
.setOutputCol("indexedF1")
.fit(transformeddf)
.transform(transformeddf)
transformeddf.show(2)
transformeddf = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(5)
.fit(transformeddf)
.transform(transformeddf)
来自 ScalaIDE 的异常跟踪 - 就在它命中 VectorAssembler 时如下
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval.apply(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval.apply(complexTypes.scala:75)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun.apply(ScalaUdf.scala:72)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun.apply(ScalaUdf.scala:70)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:143)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
你得到 ClassCastException
因为这正是应该发生的事情。模式参数不用于自动转换(一些 DataSources
可能会以这种方式使用模式,但不是像 createDataFrame
这样的方法)。它只声明存储在行中的值的类型。您有责任传递与模式匹配的数据,而不是相反。
虽然 DataFrame
显示您声明的架构仅在运行时验证,因此运行时 exception.If 您希望将数据转换为您明确拥有 cast
数据的特定数据。
首先读取所有列为
StringType
:val rows = sc.textFile(staticfeatures_filepath) .map(line => Row.fromSeq(line.split(","))) val schema = StructType( schemaString.split(" ").map( columnName => StructField(columnName, StringType, false) ) ) val df = sqlContext.createDataFrame(rows, schema)
接下来将选定的列转换为所需的类型:
import org.apache.spark.sql.types.{LongType, StringType} val types = Map("strcol1" -> StringType, "strcol2" -> StringType) .withDefault(_ => LongType) val casted = df.select(df.columns.map(c => col(c).cast(types(c)).alias(c)): _*)
使用汇编程序:
val transformeddf = new VectorAssembler() .setInputCols(featureColumns) .setOutputCol("features") .transform(casted)
您可以使用 spark-csv
:
// As originally
val schema = StructType(
schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
val df = sqlContext
.read.schema(schema)
.format("com.databricks.spark.csv")
.option("header", "false")
.load(staticfeatures_filepath)
另见