将 JSON 写入文件时出现序列化错误

Serialization error while writing JSON to file

我正在读取文本文件并在每次迭代中创建 Json 个对象 JsValues。我想在每次迭代时将它们保存到一个文件中。我正在使用 Play Framework 创建 JSON 个对象。

class Cleaner {
  def getDocumentData() = {
     for (i <- no_of_files) {
     .... do something ...
         some_json = Json.obj("text" -> LARGE_TEXT)
         final_json = Json.stringify(some_json)
         //save final_json here to a file
     }
  }
}

我尝试使用 PrintWriter 来保存 json 但我得到 Exception in thread "main" org.apache.spark.SparkException: Task not serializable 作为错误。

我该如何纠正?还是有其他方法可以保存 JsValue?

更新:

我读到在这种情况下必须使用特征 serializable。我有以下功能:

class Cleaner() extends Serializable {
  def readDocumentData() {
    val conf = new SparkConf()
      .setAppName("linkin_spark")
      .setMaster("local[2]")
      .set("spark.executor.memory", "1g")
      .set("spark.rdd.compress", "true")
      .set("spark.storage.memoryFraction", "1")

    val sc = new SparkContext(conf)

    val temp = sc.wholeTextFiles("text_doc.dat)
    val docStartRegex = """<DOC>""".r
    val docEndRegex = """</DOC>""".r
    val docTextStartRegex = """<TEXT>""".r
    val docTextEndRegex = """</TEXT>""".r
    val docnoRegex = """<DOCNO>(.*?)</DOCNO>""".r
    val writer = new PrintWriter(new File("test.json"))

    for (fileData <- temp) {
      val filename = fileData._1
      val content: String = fileData._2
      println(s"For $filename, the data is:")
      var startDoc = false // This is for the
      var endDoc = false // whole file
      var startText = false //
      var endText = false //
      var textChunk = new ListBuffer[String]()
      var docID: String = ""
      var es_json: JsValue = Json.obj()

      for (current_line <- content.lines) {
        current_line match {
          case docStartRegex(_*) => {
            startDoc = true
            endText = false
            endDoc = false
          }
          case docnoRegex(group) => {
            docID = group.trim
          }
          case docTextStartRegex(_*) => {
            startText = true
          }
          case docTextEndRegex(_*) => {
            endText = true
            startText = false
          }
          case docEndRegex(_*) => {
            endDoc = true
            startDoc = false
            es_json = Json.obj(
              "_id" -> docID,
              "_source" -> Json.obj(
                "text" -> textChunk.mkString(" ")
              )
            )
            writer.write(es_json.toString())
            println(es_json.toString())
            textChunk.clear()
          }
          case _ => {
            if (startDoc && !endDoc && startText) {
              textChunk += current_line.trim
            }
          }
        }
      }
    }
    writer.close()
  }
}

这是我添加了特征的函数,但我仍然遇到同样的异常。 我重写了一个较小的版本:

def foo() {
    val conf = new SparkConf()
      .setAppName("linkin_spark")
      .setMaster("local[2]")
      .set("spark.executor.memory", "1g")
      .set("spark.rdd.compress", "true")
      .set("spark.storage.memoryFraction", "1")
    val sc = new SparkContext(conf)

    var es_json: JsValue = Json.obj()
    val writer = new PrintWriter(new File("test.json"))
    for (i <- 1 to 10) {
      es_json = Json.obj(
        "_id" -> i,
        "_source" -> Json.obj(
          "text" -> "Eureka!"
        )
      )
      println(es_json)
      writer.write(es_json.toString() + "\n")
    }
    writer.close()
  }

这个函数在有和没有 serializable 的情况下都能正常工作。我不明白发生了什么事?

编辑:第一个答案在 phone.

不是你的主 class 需要序列化,而是你在 rdd 处理循环中使用的 class 在这种情况下 for (fileData <- temp) 它需要可序列化,因为 spark 数据位于可能位于多台计算机上的多个分区上。因此,您应用于此数据的功能需要是可序列化的,以便您可以将它们发送到另一台计算机,在那里它们将被并行执行。 PrintWriter 无法序列化,因为它引用的文件只能从原始计算机获得。因此序列化错误。

在初始化spark进程的电脑上写入你的数据。你需要把整个集群的数据拿到你的机器上然后写入。

为此,您可以收集结果。 rdd.collect() 这将从集群中获取所有数据并将其放入您的驱动程序线程内存中。然后你可以使用 PrintWriter.

将它写入文件

像这样:

temp.flatMap { fileData =>
  val filename = fileData._1
  val content: String = fileData._2
  println(s"For $filename, the data is:")
  var startDoc = false // This is for the
  var endDoc = false // whole file
  var startText = false //
  var endText = false //
  var textChunk = new ListBuffer[String]()
  var docID: String = ""
  var es_json: JsValue = Json.obj()

  var results = ArrayBuffer[String]()

  for (current_line <- content.lines) {
    current_line match {
      case docStartRegex(_*) => {
        startDoc = true
        endText = false
        endDoc = false
      }
      case docnoRegex(group) => {
        docID = group.trim
      }
      case docTextStartRegex(_*) => {
        startText = true
      }
      case docTextEndRegex(_*) => {
        endText = true
        startText = false
      }
      case docEndRegex(_*) => {
        endDoc = true
        startDoc = false
        es_json = Json.obj(
          "_id" -> docID,
          "_source" -> Json.obj(
            "text" -> textChunk.mkString(" ")
          )
        )
        results.append(es_json.toString())
        println(es_json.toString())
        textChunk.clear()
      }
      case _ => {
        if (startDoc && !endDoc && startText) {
          textChunk += current_line.trim
        }
      }
    }
  }

  results
}
.collect()
.foreach(es_json => writer.write(es_json))

如果结果对于驱动程序线程内存来说太大,您可以使用 saveAsTextFile 函数将每个分区流式传输到您的驱动器。在第二种情况下,您作为参数提供的路径将被放入一个文件夹中,并且您的 rdd 的每个分区将被写入其中的一个编号文件。

像这样:

temp.flatMap { fileData =>
  val filename = fileData._1
  val content: String = fileData._2
  println(s"For $filename, the data is:")
  var startDoc = false // This is for the
  var endDoc = false // whole file
  var startText = false //
  var endText = false //
  var textChunk = new ListBuffer[String]()
  var docID: String = ""
  var es_json: JsValue = Json.obj()

  var results = ArrayBuffer[String]()

  for (current_line <- content.lines) {
    current_line match {
      case docStartRegex(_*) => {
        startDoc = true
        endText = false
        endDoc = false
      }
      case docnoRegex(group) => {
        docID = group.trim
      }
      case docTextStartRegex(_*) => {
        startText = true
      }
      case docTextEndRegex(_*) => {
        endText = true
        startText = false
      }
      case docEndRegex(_*) => {
        endDoc = true
        startDoc = false
        es_json = Json.obj(
          "_id" -> docID,
          "_source" -> Json.obj(
            "text" -> textChunk.mkString(" ")
          )
        )
        results.append(es_json.toString())
        println(es_json.toString())
        textChunk.clear()
      }
      case _ => {
        if (startDoc && !endDoc && startText) {
          textChunk += current_line.trim
        }
      }
    }
  }

  results
}
.saveAsTextFile("test.json")