将 JSON 个对象转换为 RDD

Convert JSON objects to RDD

我不知道这个问题是否重复,但我遇到的所有答案似乎都不适合我(也许我做错了什么)。

我有一个 class 定义如下:

case class myRec(
                 time: String,
                 client_title: String,
                 made_on_behalf: Double,
                 country: String,
                 email_address: String,
                 phone: String)

和一个示例 Json 文件,其中包含表单中的记录或对象

[{...}{...}{...}...] 

[{"time": "2015-05-01 02:25:47",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Brussel",
"email_address": "15e29034@gmail.com"},
{"time": "2015-05-01 04:15:03",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Bundesliga",
"email_address": "aae665d95c5d630@aol.com"},
{"time": "2015-05-01 06:29:18",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Japan",
"email_address": "fef412c714ff@yahoo.com"}...]

我的 build.sbtlibraryDependencies += "com.owlike" % "genson-scala_2.11" % "1.3" scalaVersion := "2.11.7",

我有一个这样定义的 scala 函数

//PS: Other imports already made
import com.owlike.genson.defaultGenson_

//PS: Spark context already defined
def prepData(infile:String):RDD[myRec] = {

val input = sc.textFile(infile)
//Read Json Data into my Record Case class
input.mapPartitions( records =>
  records.map( record => fromJson[myRec](record))
)}

我正在调用函数

prepData("file://path/to/abc.json")

有没有办法做到这一点,或者有没有其他 Json 库可以用来转换为 RDD

这个我也试过了,好像都不行

Using ScalaObjectMapper

PS: 我不想通过 spark SQL 来处理 json 文件

谢谢!

Jyd,不为 JSON 使用 Spark SQL 是一个有趣的选择,但它非常可行。 Learning Spark 书籍的示例中有一个如何执行此操作的示例(免责声明,我是合著者之一,所以有点偏见)。示例在 github https://github.com/databricks/learning-spark 上,但这里是相关的代码片段:

case class Person(name: String, lovesPandas: Boolean) // Note: must be a top level class

object BasicParseJsonWithJackson {

  def main(args: Array[String]) {
    if (args.length < 3) {
      println("Usage: [sparkmaster] [inputfile] [outputfile]")
      exit(1)
      }
    val master = args(0)
    val inputFile = args(1)
    val outputFile = args(2)
    val sc = new SparkContext(master, "BasicParseJsonWithJackson", System.getenv("SPARK_HOME"))
    val input = sc.textFile(inputFile)

    // Parse it into a specific case class. We use mapPartitions beacuse:
    // (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
    //     on the driver and have to send data back to the driver to go through the singleton object.
    //     Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
    // (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
    //     partition with mapPartitions. Solves serialization and object creation performance hit.
    val result = input.mapPartitions(records => {
        // mapper object created on each executor node
        val mapper = new ObjectMapper with ScalaObjectMapper
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        mapper.registerModule(DefaultScalaModule)
        // We use flatMap to handle errors
        // by returning an empty list (None) if we encounter an issue and a
        // list with one element if everything is ok (Some(_)).
        records.flatMap(record => {
          try {
            Some(mapper.readValue(record, classOf[Person]))
          } catch {
            case e: Exception => None
          }
        })
    }, true)
    result.filter(_.lovesPandas).mapPartitions(records => {
      val mapper = new ObjectMapper with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)
      records.map(mapper.writeValueAsString(_))
    })
      .saveAsTextFile(outputFile)
    }
}

请注意,这使用了 Jackson(特别是 "com.fasterxml.jackson.core" % "jackson-databind" % "2.3.3""com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.3.3" 依赖项)。

我刚刚注意到你的问题有一些样本输入,正如@zero323 指出的那样,逐行解析是行不通的。相反,你会这样做:

    val input = sc.wholeTextFiles(inputFile).map(_._2)

    // Parse it into a specific case class. We use mapPartitions beacuse:
    // (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
    //     on the driver and have to send data back to the driver to go through the singleton object.
    //     Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
    // (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
    //     partition with mapPartitions. Solves serialization and object creation performance hit.
    val result = input.mapPartitions(records => {
        // mapper object created on each executor node
        val mapper = new ObjectMapper with ScalaObjectMapper
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        mapper.registerModule(DefaultScalaModule)
        // We use flatMap to handle errors
        // by returning an empty list (None) if we encounter an issue and a
        // list with one element if everything is ok (List(_)).
        records.flatMap(record => {
          try {
            mapper.readValue(record, classOf[List[Person]])
          } catch {
            case e: Exception => None
          }
        })
    })

为了好玩,您可以尝试使用特定的分隔符拆分单个文档。虽然它不适用于复杂的嵌套文档,但它应该在不使用 wholeTextFiles:

的情况下处理示例输入
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.conf.Configuration
import net.liftweb.json.{parse, JObject, JField, JString, JInt}

case class MyRec(
  time: String,
  client_title: String,
  made_on_behalf: Double,
  country: String,
  email_address: String)

@transient val conf = new Configuration
conf.set("textinputformat.record.delimiter", "},\n{")

def clean(s: String) = {
   val p = "(?s)\[?\{?(.*?)\}?\]?".r
   s match {
     case p(x) => Some(s"{$x}")
     case _ => None
   }
}

def toRec(os: Option[String]) = {
  os match {
    case Some(s) => 
      for {
        JObject(o) <- parse(s);
        JField("time", JString(time)) <- o;
        JField("client_title", JString(client_title)) <- o;
        JField("made_on_behalf", JInt(made_on_behalf)) <- o
        JField("country", JString(country)) <- o;
        JField("email_address", JString(email)) <- o
      } yield MyRec(time, client_title, made_on_behalf.toDouble, country, email)
    case _ => Nil
  }
}

val records = sc.newAPIHadoopFile("some.json",
    classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
      .map{case (_, txt) => clean(txt.toString)}
      .flatMap(toRec)