如何为 setInputCol() 提供多列
How to provide multiple columns to setInputCol()
我是 Spark Machine Learning 的新手,我想将多个列传递给功能,在我下面的代码中,我只将 Date 列传递给功能,但现在我想将 Userid 和 Date 列传递给功能。我尝试使用 Vector 但它仅支持 Double 数据类型但在我的情况下我有 Int 和 String
如果有人提供任何 suggestion/solution 或任何代码示例可以满足我的要求,我将不胜感激
代码:
case class LabeledDocument(Userid: Double, Date: String, label: Double)
val training = spark.read.option("inferSchema", true).csv("/root/Predictiondata3.csv").toDF("Userid","Date","label").toDF().as[LabeledDocument]
import scala.beans.BeanInfo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.{Row, SQLContext}
val tokenizer = new Tokenizer().setInputCol("Date").setOutputCol("words")
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression().setMaxIter(100).setRegParam(0.001).setElasticNetParam(0.0001)
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(training.toDF())
case class Document(Userid: Integer, Date: String)
val test = sc.parallelize(Seq(Document(4, "04-Jan-18"),Document(5, "01-Jan-17"),Document(2, "03-Jan-17")))
model.transform(test.toDF()).show()
输入数据 包含列
Userid,Date,SwipeIntime
1, 1-Jan-2017,9.30
1, 2-Jan-2017,9.35
1, 3-Jan-2017,9.45
1, 4-Jan-2017,9.26
2, 1-Jan-2017,9.37
2, 2-Jan-2017,9.35
2, 3-Jan-2017,9.45
2, 4-Jan-2017,9.46
我找到了解决方案。
import scala.beans.BeanInfo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.attribute.NominalAttribute
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}
case class LabeledDocument(Userid: Double, Date: String, label: Double)
val trainingData = spark.read.option("inferSchema", true).csv("/root/Predictiondata10.csv").toDF("Userid","Date","label").toDF().as[LabeledDocument]
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
val DateIndexer = new StringIndexer().setInputCol("Date").setOutputCol("DateCat")
val indexed = DateIndexer.fit(trainingData).transform(trainingData)
val assembler = new VectorAssembler().setInputCols(Array("DateCat", "Userid")).setOutputCol("rawfeatures")
val output = assembler.transform(indexed)
val rows = output.select("Userid","Date","label","DateCat","rawfeatures").collect()
val asTuple=rows.map(a=>(a.getInt(0),a.getString(1),a.getDouble(2),a.getDouble(3),a(4).toString()))
val r2 = sc.parallelize(asTuple).toDF("Userid","Date","label","DateCat","rawfeatures")
val Array(training, testData) = r2.randomSplit(Array(0.7, 0.3))
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
val tokenizer = new Tokenizer().setInputCol("rawfeatures").setOutputCol("words")
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression().setMaxIter(100).setRegParam(0.001).setElasticNetParam(0.0001)
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(training.toDF())
model.transform(testData.toDF()).show()
我是 Spark Machine Learning 的新手,我想将多个列传递给功能,在我下面的代码中,我只将 Date 列传递给功能,但现在我想将 Userid 和 Date 列传递给功能。我尝试使用 Vector 但它仅支持 Double 数据类型但在我的情况下我有 Int 和 String
如果有人提供任何 suggestion/solution 或任何代码示例可以满足我的要求,我将不胜感激
代码:
case class LabeledDocument(Userid: Double, Date: String, label: Double)
val training = spark.read.option("inferSchema", true).csv("/root/Predictiondata3.csv").toDF("Userid","Date","label").toDF().as[LabeledDocument]
import scala.beans.BeanInfo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.{Row, SQLContext}
val tokenizer = new Tokenizer().setInputCol("Date").setOutputCol("words")
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression().setMaxIter(100).setRegParam(0.001).setElasticNetParam(0.0001)
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(training.toDF())
case class Document(Userid: Integer, Date: String)
val test = sc.parallelize(Seq(Document(4, "04-Jan-18"),Document(5, "01-Jan-17"),Document(2, "03-Jan-17")))
model.transform(test.toDF()).show()
输入数据 包含列
Userid,Date,SwipeIntime
1, 1-Jan-2017,9.30
1, 2-Jan-2017,9.35
1, 3-Jan-2017,9.45
1, 4-Jan-2017,9.26
2, 1-Jan-2017,9.37
2, 2-Jan-2017,9.35
2, 3-Jan-2017,9.45
2, 4-Jan-2017,9.46
我找到了解决方案。
import scala.beans.BeanInfo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.attribute.NominalAttribute
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}
case class LabeledDocument(Userid: Double, Date: String, label: Double)
val trainingData = spark.read.option("inferSchema", true).csv("/root/Predictiondata10.csv").toDF("Userid","Date","label").toDF().as[LabeledDocument]
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
val DateIndexer = new StringIndexer().setInputCol("Date").setOutputCol("DateCat")
val indexed = DateIndexer.fit(trainingData).transform(trainingData)
val assembler = new VectorAssembler().setInputCols(Array("DateCat", "Userid")).setOutputCol("rawfeatures")
val output = assembler.transform(indexed)
val rows = output.select("Userid","Date","label","DateCat","rawfeatures").collect()
val asTuple=rows.map(a=>(a.getInt(0),a.getString(1),a.getDouble(2),a.getDouble(3),a(4).toString()))
val r2 = sc.parallelize(asTuple).toDF("Userid","Date","label","DateCat","rawfeatures")
val Array(training, testData) = r2.randomSplit(Array(0.7, 0.3))
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
val tokenizer = new Tokenizer().setInputCol("rawfeatures").setOutputCol("words")
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression().setMaxIter(100).setRegParam(0.001).setElasticNetParam(0.0001)
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(training.toDF())
model.transform(testData.toDF()).show()