无法在 Spark 数据框中找到字段

Cant find field in Spark dataframe

我目前正在做一个 scala/spark 家庭作业项目 ibn,我要 read-in 一个包含几千条电影评论的 csv 文件作为数据框。然后我要分析这些评论并训练一个模型来检测评论是正面的还是负面的。我将使用 TF-IDF 和 Word2Vec 训练这些模型。我遇到的问题是,到目前为止我编写的代码没有找到名为 "word" 的指定 header 字段,该字段由正则表达式分词器输出。我的代码写在下面,以及控制台输出。

我感谢你的帮助,并感谢任何关于我如何做到这一点的指示 correctly/better 而不是我现在正在做的事情。

import org.apache.spark._
//import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession
import java.io._
import scala.io._
import scala.collection.mutable.ListBuffer
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.rand
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}

/*
DONE: step 1: loop through all the positive / negative reviews and label each (1 = Positive, 2 = Negative)
        during the loop, place the text that is read as a string into a DF.
step 2: check which words are common among different labels and text with each other (possibly remove stop words)
        this will satisfy the TF-IDF requirement
step 3: convert text into vectors and perform regression on the values
step 4: compare accuracy using the actual data (data for the above was using the test folder data)
*/

object Machine {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("Movie Review Manager").getOrCreate()
    println("Reading data...")
    val df = spark.read.format("csv").option("header", "true").load("movie_data.csv")
    val regexTokenizer = new RegexTokenizer().setInputCol("review").setOutputCol("word").setPattern("\s")
    val remover = new StopWordsRemover().setInputCol("word").setOutputCol("feature")
    df.show()
    regexTokenizer.transform(df).show(false)
    df.collect()
    remover.transform(df).show(false)
    df.show()
    spark.stop()
  }
}

控制台输出如下:

Exception in thread "main" 2018-03-13 03:41:28 INFO  ContextCleaner:54 - Cleaned accumulator 125
java.lang.IllegalArgumentException: Field "word" does not exist.2018-03-13 03:41:28 INFO  ContextCleaner:54 - Cleaned accumulator 118

2018-03-13 03:41:28 INFO  ContextCleaner:54 - Cleaned accumulator 116
        at org.apache.spark.sql.types.StructType$$anonfun$apply.apply(StructType.scala:267)2018-03-13 03:41:28 INFO  ContextCleaner:54 - Cleaned accumulator 102

2018-03-13 03:41:28 INFO  ContextCleaner:54 - Cleaned accumulator 110
        at org.apache.spark.sql.types.StructType$$anonfun$apply.apply(StructType.scala:267)2018-03-13 03:41:28 INFO  ContextCleaner:54 - Cleaned accumulator 103

        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala.collection.AbstractMap.getOrElse(Map.scala:59)
        at org.apache.spark.sql.types.StructType.apply(StructType.scala:266)
        at org.apache.spark.ml.feature.StopWordsRemover.transformSchema(StopWordsRemover.scala:111)
        at org.apache.spark.ml.feature.StopWordsRemover.transform(StopWordsRemover.scala:91)
        at Machine$.main(movieProgram.scala:44)
        at Machine.main(movieProgram.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:197)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2018-03-13 03:41:28 INFO  SparkContext:54 - Invoking stop() from shutdown hook
2018-03-13 03:41:28 INFO  AbstractConnector:318 - Stopped Spark@3abfc4ed{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}

您缺少存储要用于 new StopWordsRemover().setInputCol("word").setOutputCol("feature")

new RegexTokenizer().setInputCol("review").setOutputCol("word").setPattern("\s") 的转换

因为你错过了保存 regex tokenizer 算法应用的数据帧 用于 停用词删除算法 并使用 原始 df 数据框(其中 word 列不存在) 你有错误说明

java.lang.IllegalArgumentException: Field "word" does not exist...

正确的做法是如下

object Machine {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("Movie Review Manager").getOrCreate()
    println("Reading data...")
    val df = spark.read.format("csv").option("header", "true").load("movie_data.csv")
    val regexTokenizer = new RegexTokenizer().setInputCol("review").setOutputCol("word").setPattern("\s")
    val remover = new StopWordsRemover().setInputCol("word").setOutputCol("feature")
    df.show(false)   //original dataframe
    val tokenized = regexTokenizer.transform(df)
    tokenized.show(false)     //tokenized dataframe
    val removed = remover.transform(tokenized)
    removed.show(false)     //stopwords removed dataframe
    spark.stop()
  }
}

希望回答对你有帮助