unsupportedOperationException 使用 Joda 时间将字符串转换为 DateTime 时出错

unsupportedOperationException Error converting string to DateTime using Joda time

我正在使用 joda.time.Datetime 库将字符串转换为日期时间字段,但它抛出不受支持的异常 这是主要的 class 代码:

//create new var with input data without header
var inputDataWithoutHeader: RDD[String] = dropHeader(inputFile)
var inputDF1 = inputDataWithoutHeader.map(_.split(",")).map{p =>
val dateYMD: DateTime = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").parseDateTime(p(8))
testData(dateYMD)}.toDF().show()

p(8) 是在 class testData 中定义的数据类型日期时间的列,该列的 CSV 数据的值类似于 2013-02-17 00:00:00

这是测试数据Class:

case class testData(StartDate: DateTime) { }

这是我得到的错误:

线程异常 "main"

java.lang.UnsupportedOperationException: Schema for type org.joda.time.DateTime is not supported
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor.apply(ScalaReflection.scala:128)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor.apply(ScalaReflection.scala:126)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:126)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:361)
    at org.apache.spark.sql.SQLImplicits.rddToDataFrameHolder(SQLImplicits.scala:47)
    at com.projs.poc.spark.ml.ProcessCSV$delayedInit$body.apply(ProcessCSV.scala:37)
  1. 如您在 the official documentation 中所读,Spark SQL 中的日期使用 java.sql.Timestamp 表示。如果你想使用 Joda 时间,你必须将输出转换为正确的类型

  2. SparkSQL 可以使用类型转换轻松处理标准日期格式:

    sc.parallelize(Seq(Tuple1("2016-01-11 00:01:02")))
      .toDF("dt")
      .select($"dt".cast("timestamp"))
    

感谢 zero323 提供的解决方案。我使用了 java.sql.Timestamp,这里是我修改的代码

val dateYMD: java.sql.Timestamp = new java.sql.Timestamp(DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").parseDateTime(p(8)).getMillis)
testData(dateYMD)}.toDF().show()

并将我的 class 更改为

case class testData(GamingDate: java.sql.Timestamp) { }

Scala spark 模式不明确支持日期时间。 您可以探索其他选项。他们是:

  1. 将日期时间转换为毫秒,您可以维护为长格式。

  2. 将日期时间转换为 unixtime(java 格式)

  3. 将日期时间转换为字符串。您可以随时使用 DateTime.parse("stringdatetime")

    改回 joda 日期时间
  4. 如果您仍想在 scala 模式中维护 joda 日期时间,那么您可以将数据帧转换为序列

    dataframe.rdd.map(r =>DateTime.parse(r(0).toString())).collect().toSeq