Scala 解析来自 kafka 的 json 记录
Scala Parsing a json record coming from kafka
我正在将 Spark Streaming 与 Scala 结合使用,并且我正在从 kafka 获取 json 记录。我想解析它,以便我可以获得值(日期时间和质量)和过程。
这是我的代码:
stream.foreachRDD(rdd => {
rdd.collect().foreach(i =>
println(msgParse(i.value()).quality)
)
})
我有这个案例 class 和我的解析函数:
case class diskQuality(datetime: String , quality : Double) extends Serializable
def msgParse(value: String): diskQuality = {
import org.json4s._
import org.json4s.native.JsonMethods._
implicit val formats = DefaultFormats
val res = parse(value).extract[diskQuality]
return res
}
我添加了这个依赖:
libraryDependencies += "org.json4s" % "json4s-native_2.10" % "3.2.4"
我收到的记录格式如下:
"{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
但是我得到这个错误:
Exception in thread "main" org.json4s.ParserUtil$ParseException: expected field or array Near: ,\"quality\":100.0}"
编辑:
当我尝试使用相同的函数解析以下内容时,它起作用了。但即使 kafka 消息采用相同的格式,它仍然会给出相同的错误:
val test = "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
我正在使用 scalaVersion := "2.10.6" 和 json4s-native_2.10"
任何帮助将不胜感激。谢谢你的时间
看起来您的 Kafka Producer 端有问题,您必须通过替换转义引号以以下格式结束:
{"datetime":"14-05-2017 14:18:30","quality":92.6}
它将为您提供格式正确的 JSON 字符串。
我正在将 Spark Streaming 与 Scala 结合使用,并且我正在从 kafka 获取 json 记录。我想解析它,以便我可以获得值(日期时间和质量)和过程。
这是我的代码:
stream.foreachRDD(rdd => {
rdd.collect().foreach(i =>
println(msgParse(i.value()).quality)
)
})
我有这个案例 class 和我的解析函数:
case class diskQuality(datetime: String , quality : Double) extends Serializable
def msgParse(value: String): diskQuality = {
import org.json4s._
import org.json4s.native.JsonMethods._
implicit val formats = DefaultFormats
val res = parse(value).extract[diskQuality]
return res
}
我添加了这个依赖:
libraryDependencies += "org.json4s" % "json4s-native_2.10" % "3.2.4"
我收到的记录格式如下:
"{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
但是我得到这个错误:
Exception in thread "main" org.json4s.ParserUtil$ParseException: expected field or array Near: ,\"quality\":100.0}"
编辑:
当我尝试使用相同的函数解析以下内容时,它起作用了。但即使 kafka 消息采用相同的格式,它仍然会给出相同的错误:
val test = "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
我正在使用 scalaVersion := "2.10.6" 和 json4s-native_2.10"
任何帮助将不胜感激。谢谢你的时间
看起来您的 Kafka Producer 端有问题,您必须通过替换转义引号以以下格式结束:
{"datetime":"14-05-2017 14:18:30","quality":92.6}
它将为您提供格式正确的 JSON 字符串。