使用 json4s 解析 JSON 时引发不可序列化异常
Spark non-serializable exception when parsing JSON with json4s
我 运行 在我的 spark 作业中试图解析 json 时遇到了问题。我正在使用 spark 1.1.0
、json4s
和 Cassandra Spark Connector
。抛出的异常是:
java.io.NotSerializableException: org.json4s.DefaultFormats
检查 DefaultFormats 伴生对象,以及这个 stack 问题,很明显 DefaultFormats 无法序列化。现在的问题是怎么办。
我可以看到这个 ticket 通过添加关键字 t运行sient 显然已经在 spark 代码库中解决了这个问题,但我不确定如何或在哪里应用它我的情况。是否只在执行程序上实例化 DefaultFormats class 以避免一起序列化的解决方案?人们正在使用 scala/spark 的另一个 JSON 解析库吗?我最初尝试单独使用 jackson,但是 运行 遇到了一些我无法轻松解决的带有注释的错误,并且 json4s 开箱即用。这是我的代码:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y)
我在 checkUa 函数中进行 json 解析。我试着让计数变得懒惰,希望它能以某种方式延迟执行,但没有效果。也许在 checkUA 中移动隐式 val?非常感谢任何建议。
这已在 an open ticket with json4s 中得到解答。解决方法是将 implicit
声明放在函数
中
val count = rdd
.map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)})
.reduce((x, y) => x + y)
当我将 implicit val formats = ...
声明放在包含解析的方法中,而不是在 class(对象)上声明它时,我遇到了同样的错误。
所以这会引发错误:
object Application {
//... Lots of other code here, which eventually calls
// setupStream(...)
def setupStream(streamingContext: StreamingContext,
brokers: String,
topologyTopicName: String) = {
implicit val formats = DefaultFormats
_createDStream(streamingContext, brokers, topologyTopicName)
// Remove the message key, which is always null in our case
.map(_._2)
.map((json: String) => parse(json).camelizeKeys
.extract[Record[TopologyMetadata, Unused]])
.print()
}
但这样就好了:
object Application {
implicit val formats = DefaultFormats
//... Lots of other code here, which eventually calls
// setupStream(...)
def setupStream(streamingContext: StreamingContext,
brokers: String,
topologyTopicName: String) = {
_createDStream(streamingContext, brokers, topologyTopicName)
// Remove the message key, which is always null in our case
.map(_._2)
.map((json: String) => parse(json).camelizeKeys
.extract[Record[TopologyMetadata, Unused]])
.print()
}
我 运行 在我的 spark 作业中试图解析 json 时遇到了问题。我正在使用 spark 1.1.0
、json4s
和 Cassandra Spark Connector
。抛出的异常是:
java.io.NotSerializableException: org.json4s.DefaultFormats
检查 DefaultFormats 伴生对象,以及这个 stack 问题,很明显 DefaultFormats 无法序列化。现在的问题是怎么办。
我可以看到这个 ticket 通过添加关键字 t运行sient 显然已经在 spark 代码库中解决了这个问题,但我不确定如何或在哪里应用它我的情况。是否只在执行程序上实例化 DefaultFormats class 以避免一起序列化的解决方案?人们正在使用 scala/spark 的另一个 JSON 解析库吗?我最初尝试单独使用 jackson,但是 运行 遇到了一些我无法轻松解决的带有注释的错误,并且 json4s 开箱即用。这是我的代码:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y)
我在 checkUa 函数中进行 json 解析。我试着让计数变得懒惰,希望它能以某种方式延迟执行,但没有效果。也许在 checkUA 中移动隐式 val?非常感谢任何建议。
这已在 an open ticket with json4s 中得到解答。解决方法是将 implicit
声明放在函数
val count = rdd
.map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)})
.reduce((x, y) => x + y)
当我将 implicit val formats = ...
声明放在包含解析的方法中,而不是在 class(对象)上声明它时,我遇到了同样的错误。
所以这会引发错误:
object Application {
//... Lots of other code here, which eventually calls
// setupStream(...)
def setupStream(streamingContext: StreamingContext,
brokers: String,
topologyTopicName: String) = {
implicit val formats = DefaultFormats
_createDStream(streamingContext, brokers, topologyTopicName)
// Remove the message key, which is always null in our case
.map(_._2)
.map((json: String) => parse(json).camelizeKeys
.extract[Record[TopologyMetadata, Unused]])
.print()
}
但这样就好了:
object Application {
implicit val formats = DefaultFormats
//... Lots of other code here, which eventually calls
// setupStream(...)
def setupStream(streamingContext: StreamingContext,
brokers: String,
topologyTopicName: String) = {
_createDStream(streamingContext, brokers, topologyTopicName)
// Remove the message key, which is always null in our case
.map(_._2)
.map((json: String) => parse(json).camelizeKeys
.extract[Record[TopologyMetadata, Unused]])
.print()
}