从 spark-shell 中的分布式操作调用“JValue.extract”时出错

Error calling `JValue.extract` from distributed operations in spark-shell

我正在尝试使用 Spark 中 json4s 的 case class 提取功能, 即调用 jvalue.extract[MyCaseClass]。如果我将 JValue 对象放入 master 并在那里进行提取,它工作正常,但相同的调用在 worker 中失败:

import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.util.{Try, Success, Failure}

val sqx = sqlContext

val data = sc.textFile(inpath).coalesce(2000)

case class PageView(
 client:  Option[String]
)

def extract(json: JValue) = {
  implicit def formats = org.json4s.DefaultFormats
  Try(json.extract[PageView]).toOption
}

val json = data.map(parse(_)).sample(false, 1e-6).cache()

// count initial inputs
val raw = json.count 


// count successful extractions locally -- same value as above
val loc = json.toLocalIterator.flatMap(extract).size

// distributed count -- always zero
val dist = json.flatMap(extract).count // always returns zero

// this throws  "org.json4s.package$MappingException: Parsed JSON values do not match with class constructor"
json.map(x => {implicit def formats = org.json4s.DefaultFormats; x.extract[PageView]}).count

Formats 的隐式在 extract 函数中本地定义,因为 DefaultFormats 不可序列化并且在顶层定义它导致它被序列化以传输给工作人员而不是在那里构建.我认为问题仍然与DefaultFormats的远程初始化有关,但我不确定它是什么

当我直接调用 extract 方法而不是我的 extract 函数时,就像上一个例子一样,它不再抱怨序列化,而是抛出一个错误 JSON 与预期的结构不匹配。

分配给工作人员后,如何让提取工作?


编辑

@WesleyMiao复现了该问题,发现是spark-shell特有的。他报告说此代码可作为独立应用程序使用。

按照建议here,我会将对象创建移动到地图中。 IE。我会有函数 createPageViews 将提取作为内部函数并将 createPageViews 传递给工人。

更准确地说,我会使用 mapPartitions 而不是 map - 所以它必须每个分区只调用一次 createPageViews(它是内部函数定义部分) - 而不是每个记录一次。

运行在 spark-shell 中使用您的代码时,我遇到了与您相同的异常。然而,当我把你的代码变成一个真正的 spark 应用程序并将其提交到一个独立的 spark 集群时,我无一例外地得到了预期的结果。

下面是我放入一个简单的 spark 应用程序中的代码。

val data = sc.parallelize(Seq("""{"client":"Michael"}""", """{"client":"Wesley"}"""))

val json = data.map(parse(_))

val dist = json.mapPartitions { jsons =>
  implicit val formats = org.json4s.DefaultFormats
  jsons.map(_.extract[PageView])
}

dist.collect() foreach println

当我 运行 使用 spark-submit 时,我得到了以下结果。

PageView(Some(Michael))                                                                                                                                       
PageView(Some(Wesley))

我也确定它 运行 不是 "local[*]" 模式。

现在我怀疑我们在 spark-shell 中 运行ning 时出现异常的原因与 spark-shell 中的 class PageView 定义有关以及 spark-shell 如何将其序列化/分发给执行者。