如何在 Spark 2 Scala 中将行转换为 json
How to convert Row to json in Spark 2 Scala
是否有一种简单的方法可以将给定的 Row 对象转换为 json?
发现有关将整个 Dataframe 转换为 json 输出的信息:
但我只想将一行转换为 json。
这是我正在尝试做的伪代码。
更准确地说,我正在阅读 json 作为 Dataframe 中的输入。
我正在生成一个主要基于列的新输出,但是有一个 json 字段用于所有不适合列的信息。
我的问题是编写此函数的最简单方法是什么:convertRowToJson()
def convertRowToJson(row: Row): String = ???
def transformVenueTry(row: Row): Try[Venue] = {
Try({
val name = row.getString(row.fieldIndex("name"))
val metadataRow = row.getStruct(row.fieldIndex("meta"))
val score: Double = calcScore(row)
val combinedRow: Row = metadataRow ++ ("score" -> score)
val jsonString: String = convertRowToJson(combinedRow)
Venue(name = name, json = jsonString)
})
}
Psidom 的解决方案:
def convertRowToJSON(row: Row): String = {
val m = row.getValuesMap(row.schema.fieldNames)
JSONObject(m).toString()
}
仅当行只有一层而不是嵌套行时才有效。这是架构:
StructType(
StructField(indicator,StringType,true),
StructField(range,
StructType(
StructField(currency_code,StringType,true),
StructField(maxrate,LongType,true),
StructField(minrate,LongType,true)),true))
也尝试了 Artem 的建议,但没有编译:
def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = {
val sparkContext = sqlContext.sparkContext
import sparkContext._
import sqlContext.implicits._
import sqlContext._
val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
val dataFrame = rowRDD.toDF() //XXX does not compile
dataFrame
}
本质上,您可以拥有一个只包含一行的数据框。因此,您可以尝试过滤您的初始数据帧,然后将其解析为 json.
可以使用getValuesMap
将行对象转换成Map再转换JSON:
import scala.util.parsing.json.JSONObject
import org.apache.spark.sql._
val df = Seq((1,2,3),(2,3,4)).toDF("A", "B", "C")
val row = df.first() // this is an example row object
def convertRowToJSON(row: Row): String = {
val m = row.getValuesMap(row.schema.fieldNames)
JSONObject(m).toString()
}
convertRowToJSON(row)
// res46: String = {"A" : 1, "B" : 2, "C" : 3}
JSon 有架构,但行没有架构,因此您需要在行上应用架构并转换为 JSon。以下是您的操作方法。
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
def convertRowToJson(row: Row): String = {
val schema = StructType(
StructField("name", StringType, true) ::
StructField("meta", StringType, false) :: Nil)
return sqlContext.applySchema(row, schema).toJSON
}
我结合了来自 Artem、KiranM 和 Psidom 的建议。做了很多尝试和错误,并提出了我测试嵌套结构的解决方案:
def row2Json(row: Row, sqlContext: SQLContext): String = {
import sqlContext.implicits
val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
val dataframe = sqlContext.createDataFrame(rowRDD, row.schema)
dataframe.toJSON.first
}
此解决方案有效,但仅在 运行 处于驱动程序模式时有效。
我需要读取 json 输入并生成 json 输出。
大多数字段都是单独处理的,但是需要保留一些 json 个子对象。
当 Spark 读取数据帧时,它会将记录转换为行。 Row 是一个类似 json 的结构。可以转换并写出 json。
但我需要将一些子 json 结构提取到字符串中以用作新字段。
可以这样做:
dataFrameWithJsonField = dataFrame.withColumn("address_json", to_json($"location.address"))
location.address
是传入的基于 json 的数据帧的子 json 对象的路径。 address_json
是该对象的列名,转换为 json.
的字符串版本
to_json
在 Spark 2.1 中实现。
如果使用 json4s address_json 生成它的输出 json 应该被解析为 AST 表示,否则输出 json 将具有 address_json 部分逃脱了。
注意 scala class scala.util.parsing.json.JSONObject 已弃用且不支持空值。
@deprecated("This class will be removed.", "2.11.0")
"JSONFormat.defaultFormat doesn't handle null values"
我遇到了同样的问题,我有带有规范模式(无数组)的镶木地板文件,我只想获取 json 事件。我做了如下操作,它似乎工作得很好(Spark 2.1):
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import scala.util.parsing.json.JSONFormat.ValueFormatter
import scala.util.parsing.json.{JSONArray, JSONFormat, JSONObject}
def getValuesMap[T](row: Row, schema: StructType): Map[String,Any] = {
schema.fields.map {
field =>
try{
if (field.dataType.typeName.equals("struct")){
field.name -> getValuesMap(row.getAs[Row](field.name), field.dataType.asInstanceOf[StructType])
}else{
field.name -> row.getAs[T](field.name)
}
}catch {case e : Exception =>{field.name -> null.asInstanceOf[T]}}
}.filter(xy => xy._2 != null).toMap
}
def convertRowToJSON(row: Row, schema: StructType): JSONObject = {
val m: Map[String, Any] = getValuesMap(row, schema)
JSONObject(m)
}
//I guess since I am using Any and not nothing the regular ValueFormatter is not working, and I had to add case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
val defaultFormatter : ValueFormatter = (x : Any) => x match {
case s : String => "\"" + JSONFormat.quoteString(s) + "\""
case jo : JSONObject => jo.toString(defaultFormatter)
case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
case ja : JSONArray => ja.toString(defaultFormatter)
case other => other.toString
}
val someFile = "s3a://bucket/file"
val df: DataFrame = sqlContext.read.load(someFile)
val schema: StructType = df.schema
val jsons: Dataset[JSONObject] = df.map(row => convertRowToJSON(row, schema))
如果你正在遍历一个数据框,你可以直接将数据框转换为一个新的数据框,里面有 json 对象并迭代那个
val df_json = df.toJSON
是否有一种简单的方法可以将给定的 Row 对象转换为 json?
发现有关将整个 Dataframe 转换为 json 输出的信息:
但我只想将一行转换为 json。 这是我正在尝试做的伪代码。
更准确地说,我正在阅读 json 作为 Dataframe 中的输入。 我正在生成一个主要基于列的新输出,但是有一个 json 字段用于所有不适合列的信息。
我的问题是编写此函数的最简单方法是什么:convertRowToJson()
def convertRowToJson(row: Row): String = ???
def transformVenueTry(row: Row): Try[Venue] = {
Try({
val name = row.getString(row.fieldIndex("name"))
val metadataRow = row.getStruct(row.fieldIndex("meta"))
val score: Double = calcScore(row)
val combinedRow: Row = metadataRow ++ ("score" -> score)
val jsonString: String = convertRowToJson(combinedRow)
Venue(name = name, json = jsonString)
})
}
Psidom 的解决方案:
def convertRowToJSON(row: Row): String = {
val m = row.getValuesMap(row.schema.fieldNames)
JSONObject(m).toString()
}
仅当行只有一层而不是嵌套行时才有效。这是架构:
StructType(
StructField(indicator,StringType,true),
StructField(range,
StructType(
StructField(currency_code,StringType,true),
StructField(maxrate,LongType,true),
StructField(minrate,LongType,true)),true))
也尝试了 Artem 的建议,但没有编译:
def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = {
val sparkContext = sqlContext.sparkContext
import sparkContext._
import sqlContext.implicits._
import sqlContext._
val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
val dataFrame = rowRDD.toDF() //XXX does not compile
dataFrame
}
本质上,您可以拥有一个只包含一行的数据框。因此,您可以尝试过滤您的初始数据帧,然后将其解析为 json.
可以使用getValuesMap
将行对象转换成Map再转换JSON:
import scala.util.parsing.json.JSONObject
import org.apache.spark.sql._
val df = Seq((1,2,3),(2,3,4)).toDF("A", "B", "C")
val row = df.first() // this is an example row object
def convertRowToJSON(row: Row): String = {
val m = row.getValuesMap(row.schema.fieldNames)
JSONObject(m).toString()
}
convertRowToJSON(row)
// res46: String = {"A" : 1, "B" : 2, "C" : 3}
JSon 有架构,但行没有架构,因此您需要在行上应用架构并转换为 JSon。以下是您的操作方法。
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
def convertRowToJson(row: Row): String = {
val schema = StructType(
StructField("name", StringType, true) ::
StructField("meta", StringType, false) :: Nil)
return sqlContext.applySchema(row, schema).toJSON
}
我结合了来自 Artem、KiranM 和 Psidom 的建议。做了很多尝试和错误,并提出了我测试嵌套结构的解决方案:
def row2Json(row: Row, sqlContext: SQLContext): String = {
import sqlContext.implicits
val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
val dataframe = sqlContext.createDataFrame(rowRDD, row.schema)
dataframe.toJSON.first
}
此解决方案有效,但仅在 运行 处于驱动程序模式时有效。
我需要读取 json 输入并生成 json 输出。 大多数字段都是单独处理的,但是需要保留一些 json 个子对象。
当 Spark 读取数据帧时,它会将记录转换为行。 Row 是一个类似 json 的结构。可以转换并写出 json。
但我需要将一些子 json 结构提取到字符串中以用作新字段。
可以这样做:
dataFrameWithJsonField = dataFrame.withColumn("address_json", to_json($"location.address"))
location.address
是传入的基于 json 的数据帧的子 json 对象的路径。 address_json
是该对象的列名,转换为 json.
to_json
在 Spark 2.1 中实现。
如果使用 json4s address_json 生成它的输出 json 应该被解析为 AST 表示,否则输出 json 将具有 address_json 部分逃脱了。
注意 scala class scala.util.parsing.json.JSONObject 已弃用且不支持空值。
@deprecated("This class will be removed.", "2.11.0")
"JSONFormat.defaultFormat doesn't handle null values"
我遇到了同样的问题,我有带有规范模式(无数组)的镶木地板文件,我只想获取 json 事件。我做了如下操作,它似乎工作得很好(Spark 2.1):
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import scala.util.parsing.json.JSONFormat.ValueFormatter
import scala.util.parsing.json.{JSONArray, JSONFormat, JSONObject}
def getValuesMap[T](row: Row, schema: StructType): Map[String,Any] = {
schema.fields.map {
field =>
try{
if (field.dataType.typeName.equals("struct")){
field.name -> getValuesMap(row.getAs[Row](field.name), field.dataType.asInstanceOf[StructType])
}else{
field.name -> row.getAs[T](field.name)
}
}catch {case e : Exception =>{field.name -> null.asInstanceOf[T]}}
}.filter(xy => xy._2 != null).toMap
}
def convertRowToJSON(row: Row, schema: StructType): JSONObject = {
val m: Map[String, Any] = getValuesMap(row, schema)
JSONObject(m)
}
//I guess since I am using Any and not nothing the regular ValueFormatter is not working, and I had to add case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
val defaultFormatter : ValueFormatter = (x : Any) => x match {
case s : String => "\"" + JSONFormat.quoteString(s) + "\""
case jo : JSONObject => jo.toString(defaultFormatter)
case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
case ja : JSONArray => ja.toString(defaultFormatter)
case other => other.toString
}
val someFile = "s3a://bucket/file"
val df: DataFrame = sqlContext.read.load(someFile)
val schema: StructType = df.schema
val jsons: Dataset[JSONObject] = df.map(row => convertRowToJSON(row, schema))
如果你正在遍历一个数据框,你可以直接将数据框转换为一个新的数据框,里面有 json 对象并迭代那个
val df_json = df.toJSON