Spark error: Exception in thread "main" java.lang.UnsupportedOperationException

Spark error: Exception in thread "main" java.lang.UnsupportedOperationException

我正在编写一个 Scala/spark 程序,它可以找到员工的最高工资。员工数据在 CSV 文件中可用,薪水列有一个逗号分隔符,表示千位,并且它有一个 $ 前缀,例如74,628.00 美元。

为了处理这个逗号和美元符号,我在 scala 中编写了一个解析器函数,它将每一行拆分为“,”,然后将每一列映射到要分配给一个案例的各个变量 class。

我的解析器程序如下所示。为了消除逗号和美元符号,我使用替换函数将其替换为空,然后最后将其转换为 Int。

def ParseEmployee(line: String): Classes.Employee = {
    val fields = line.split(",")
    val Name = fields(0)
    val JOBTITLE = fields(2)
    val DEPARTMENT = fields(3)
    val temp = fields(4)

    temp.replace(",","")//To eliminate the ,
    temp.replace("$","")//To remove the $
    val EMPLOYEEANNUALSALARY = temp.toInt //Type cast the string to Int

    Classes.Employee(Name, JOBTITLE, DEPARTMENT, EMPLOYEEANNUALSALARY)
  }

我的案例class如下所示

case class Employee (Name: String,
                      JOBTITLE: String,
                     DEPARTMENT: String,
                     EMPLOYEEANNUALSALARY: Number,
)

我的 spark 数据框 sql 查询如下所示

val empMaxSalaryValue = sc.sqlContext.sql("Select Max(EMPLOYEEANNUALSALARY) From EMP")
empMaxSalaryValue.show

当我 运行 这个程序时,我得到下面的异常

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Number
- field (class: "java.lang.Number", name: "EMPLOYEEANNUALSALARY")
- root class: "Classes.Employee"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:625)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun.apply(ScalaReflection.scala:619)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun.apply(ScalaReflection.scala:607)
    at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:607)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:438)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:282)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:272)
    at CalculateMaximumSalary$.main(CalculateMaximumSalary.scala:27)
    at CalculateMaximumSalary.main(CalculateMaximumSalary.scala)
  1. 知道我为什么会收到此错误吗?我在这里犯了什么错误,为什么它无法转换为数字?

  2. 有没有更好的方法来处理这个获得员工最高工资的问题?

Spark SQL 仅提供有限数量的 Encoders 以具体 class 为目标。不支持像 Number 这样的抽象 class(可以与有限的二进制 Encoders 一起使用)。

既然你转换成 Int 无论如何,只需重新定义 class:

case class Employee (
  Name: String,
  JOBTITLE: String,
  DEPARTMENT: String,
  EMPLOYEEANNUALSALARY: Int
)