Scala 解析来自 kafka 的 json 记录

Scala Parsing a json record coming from kafka

我正在将 Spark Streaming 与 Scala 结合使用,并且我正在从 kafka 获取 json 记录。我想解析它,以便我可以获得值(日期时间和质量)和过程。

这是我的代码:

stream.foreachRDD(rdd => {
  rdd.collect().foreach(i =>
    println(msgParse(i.value()).quality)
  )
})

我有这个案例 class 和我的解析函数:

case class diskQuality(datetime: String , quality : Double) extends  Serializable

def msgParse(value: String): diskQuality = {

  import org.json4s._
  import org.json4s.native.JsonMethods._

  implicit val formats = DefaultFormats

  val res = parse(value).extract[diskQuality]
  return res

}

我添加了这个依赖:

libraryDependencies += "org.json4s" % "json4s-native_2.10" % "3.2.4"

我收到的记录格式如下:

"{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"

但是我得到这个错误:

Exception in thread "main" org.json4s.ParserUtil$ParseException: expected field or array Near: ,\"quality\":100.0}"

编辑:

当我尝试使用相同的函数解析以下内容时,它起作用了。但即使 kafka 消息采用相同的格式,它仍然会给出相同的错误:

val test = "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"

我正在使用 scalaVersion := "2.10.6" 和 json4s-native_2.10"

任何帮助将不胜感激。谢谢你的时间

看起来您的 Kafka Producer 端有问题,您必须通过替换转义引号以以下格式结束:

{"datetime":"14-05-2017 14:18:30","quality":92.6}

它将为您提供格式正确的 JSON 字符串。