使用 json4s 解析 JSON 时引发不可序列化异常

Spark non-serializable exception when parsing JSON with json4s

我 运行 在我的 spark 作业中试图解析 json 时遇到了问题。我正在使用 spark 1.1.0json4sCassandra Spark Connector。抛出的异常是:

java.io.NotSerializableException: org.json4s.DefaultFormats

检查 DefaultFormats 伴生对象,以及这个 stack 问题,很明显 DefaultFormats 无法序列化。现在的问题是怎么办。

我可以看到这个 ticket 通过添加关键字 t运行sient 显然已经在 spark 代码库中解决了这个问题,但我不确定如何或在哪里应用它我的情况。是否只在执行程序上实例化 DefaultFormats class 以避免一起序列化的解决方案?人们正在使用 scala/spark 的另一个 JSON 解析库吗?我最初尝试单独使用 jackson,但是 运行 遇到了一些我无法轻松解决的带有注释的错误,并且 json4s 开箱即用。这是我的代码:

import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats

val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y) 

我在 checkUa 函数中进行 json 解析。我试着让计数变得懒惰,希望它能以某种方式延迟执行,但没有效果。也许在 checkUA 中移动隐式 val?非常感谢任何建议。

这已在 an open ticket with json4s 中得到解答。解决方法是将 implicit 声明放在函数

val count = rdd
               .map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)})
               .reduce((x, y) => x + y) 

当我将 implicit val formats = ... 声明放在包含解析的方法中,而不是在 class(对象)上声明它时,我遇到了同样的错误。

所以这会引发错误:

object Application {

  //... Lots of other code here, which eventually calls 
  // setupStream(...)

  def setupStream(streamingContext: StreamingContext,
                          brokers: String,
                          topologyTopicName: String) = {
    implicit val formats = DefaultFormats
    _createDStream(streamingContext, brokers, topologyTopicName)
      // Remove the message key, which is always null in our case
      .map(_._2)
      .map((json: String) => parse(json).camelizeKeys
        .extract[Record[TopologyMetadata, Unused]])
      .print()
}

但这样就好了:

object Application {

  implicit val formats = DefaultFormats

  //... Lots of other code here, which eventually calls 
  // setupStream(...)

  def setupStream(streamingContext: StreamingContext,
                          brokers: String,
                          topologyTopicName: String) = {
    _createDStream(streamingContext, brokers, topologyTopicName)
      // Remove the message key, which is always null in our case
      .map(_._2)
      .map((json: String) => parse(json).camelizeKeys
        .extract[Record[TopologyMetadata, Unused]])
      .print()
}