Kafka 接收到的 JSON 到 Spark 中的 Dataframe 数组
Array of JSON to Dataframe in Spark received by Kafka
我正在使用 Spark Structured Streaming 在 Scala 中编写一个 Spark 应用程序,它从 Kafka 接收一些格式为 JSON 的数据。此应用程序可以接收单个或多个 JSON 以这种方式格式化的对象:
[{"key1":"value1","key2":"value2"},{"key1":"value1","key2":"value2"},...,{"key1":"value1","key2":"value2"}]
我尝试定义一个 StructType,例如:
var schema = StructType(
Array(
StructField("key1",DataTypes.StringType),
StructField("key2",DataTypes.StringType)
))
但是没有用。
我的实际解析代码 JSON:
var data = (this.stream).getStreamer().load()
.selectExpr("CAST (value AS STRING) as json")
.select(from_json($"json",schema=schema).as("data"))
我想在像
这样的数据框中获取这个JSON对象
+----------+---------+
| key1| key2|
+----------+---------+
| value1| value2|
| value1| value2|
........
| value1| value2|
+----------+---------+
有人可以帮助我吗?
谢谢!
因为你传入的字符串是Array
of JSON
,一种方法是写一个UDF
来解析Array
,然后展开解析的Array
.下面是完整的代码,每个步骤都有解释。我已经为批处理编写了它,但同样可以用于流式传输,只需进行最小的更改。
object JsonParser{
//case class to parse the incoming JSON String
case class JSON(key1: String, key2: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession.
builder().
appName("JSON").
master("local").
getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//sample JSON array String coming from kafka
val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")
//UDF to parse JSON array String
val jsonConverter = udf { jsonString: String =>
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.readValue(jsonString, classOf[Array[JSON]])
}
val df = str.toDF("json") //json String column
.withColumn("array", jsonConverter($"json")) //parse the JSON Array
.withColumn("json", explode($"array")) //explode the Array
.drop("array") //drop unwanted columns
.select("json.*") //explode the JSON to separate columns
//display the DF
df.show()
//+------+------+
//| key1| key2|
//+------+------+
//|value1|value2|
//|value3|value4|
//+------+------+
}
}
这对我来说在 Spark 3.0.0 和 Scala 2.12.10 中运行良好。我使用 schema_of_json 以适合 from_json 的格式获取数据模式,并在链的最后一步应用爆炸和 * 运算符进行相应扩展。
// TO KNOW THE SCHEMA
scala> val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")
str: Seq[String] = List([{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}])
scala> val df = str.toDF("json")
df: org.apache.spark.sql.DataFrame = [json: string]
scala> df.show()
+--------------------+
| json|
+--------------------+
|[{"key1":"value1"...|
+--------------------+
scala> val schema = df.select(schema_of_json(df.select(col("json")).first.getString(0))).as[String].first
schema: String = array<struct<key1:string,key2:string>>
使用生成的字符串作为您的架构:'array<structkey1:string,key2:string>',如下所示:
// TO PARSE THE ARRAY OF JSON's
scala> val parsedJson1 = df.selectExpr("from_json(json, 'array<struct<key1:string,key2:string>>') as parsed_json")
parsedJson1: org.apache.spark.sql.DataFrame = [parsed_json: array<struct<key1:string,key2:string>>]
scala> parsedJson1.show()
+--------------------+
| parsed_json|
+--------------------+
|[[value1, value2]...|
+--------------------+
scala> val data = parsedJson1.selectExpr("explode(parsed_json) as json").select("json.*")
data: org.apache.spark.sql.DataFrame = [key1: string, key2: string]
scala> data.show()
+------+------+
| key1| key2|
+------+------+
|value1|value2|
|value3|value4|
+------+------+
仅供参考,没有星形扩展,中间结果如下所示:
scala> val data = parsedJson1.selectExpr("explode(parsed_json) as json")
data: org.apache.spark.sql.DataFrame = [json: struct<key1: string, key2: string>]
scala> data.show()
+----------------+
| json|
+----------------+
|[value1, value2]|
|[value3, value4]|
+----------------+
- 您可以将 ArrayType 添加到您的架构中,然后 from_json
将数据转换为 json.
var schema = ArrayType(StructType(
Array(
StructField("key1", DataTypes.StringType),
StructField("key2", DataTypes.StringType)
)))
- 展开它得到每行中的 json 个数组元素。
val explodedDf = df.withColumn("jsonData", explode(from_json(col("value"), schema)))
.select($"jsonData").show
+----------------+
| jsonData|
+----------------+
|[value1, value2]|
|[value3, value4]|
+----------------+
- Select json 键
explodedDf.select("jsonData.*").show
+------+------+
| key1| key2|
+------+------+
|value1|value2|
|value3|value4|
+------+------+
我正在使用 Spark Structured Streaming 在 Scala 中编写一个 Spark 应用程序,它从 Kafka 接收一些格式为 JSON 的数据。此应用程序可以接收单个或多个 JSON 以这种方式格式化的对象:
[{"key1":"value1","key2":"value2"},{"key1":"value1","key2":"value2"},...,{"key1":"value1","key2":"value2"}]
我尝试定义一个 StructType,例如:
var schema = StructType(
Array(
StructField("key1",DataTypes.StringType),
StructField("key2",DataTypes.StringType)
))
但是没有用。 我的实际解析代码 JSON:
var data = (this.stream).getStreamer().load() .selectExpr("CAST (value AS STRING) as json") .select(from_json($"json",schema=schema).as("data"))
我想在像
这样的数据框中获取这个JSON对象+----------+---------+
| key1| key2|
+----------+---------+
| value1| value2|
| value1| value2|
........
| value1| value2|
+----------+---------+
有人可以帮助我吗? 谢谢!
因为你传入的字符串是Array
of JSON
,一种方法是写一个UDF
来解析Array
,然后展开解析的Array
.下面是完整的代码,每个步骤都有解释。我已经为批处理编写了它,但同样可以用于流式传输,只需进行最小的更改。
object JsonParser{
//case class to parse the incoming JSON String
case class JSON(key1: String, key2: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession.
builder().
appName("JSON").
master("local").
getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//sample JSON array String coming from kafka
val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")
//UDF to parse JSON array String
val jsonConverter = udf { jsonString: String =>
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.readValue(jsonString, classOf[Array[JSON]])
}
val df = str.toDF("json") //json String column
.withColumn("array", jsonConverter($"json")) //parse the JSON Array
.withColumn("json", explode($"array")) //explode the Array
.drop("array") //drop unwanted columns
.select("json.*") //explode the JSON to separate columns
//display the DF
df.show()
//+------+------+
//| key1| key2|
//+------+------+
//|value1|value2|
//|value3|value4|
//+------+------+
}
}
这对我来说在 Spark 3.0.0 和 Scala 2.12.10 中运行良好。我使用 schema_of_json 以适合 from_json 的格式获取数据模式,并在链的最后一步应用爆炸和 * 运算符进行相应扩展。
// TO KNOW THE SCHEMA
scala> val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")
str: Seq[String] = List([{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}])
scala> val df = str.toDF("json")
df: org.apache.spark.sql.DataFrame = [json: string]
scala> df.show()
+--------------------+
| json|
+--------------------+
|[{"key1":"value1"...|
+--------------------+
scala> val schema = df.select(schema_of_json(df.select(col("json")).first.getString(0))).as[String].first
schema: String = array<struct<key1:string,key2:string>>
使用生成的字符串作为您的架构:'array<structkey1:string,key2:string>',如下所示:
// TO PARSE THE ARRAY OF JSON's
scala> val parsedJson1 = df.selectExpr("from_json(json, 'array<struct<key1:string,key2:string>>') as parsed_json")
parsedJson1: org.apache.spark.sql.DataFrame = [parsed_json: array<struct<key1:string,key2:string>>]
scala> parsedJson1.show()
+--------------------+
| parsed_json|
+--------------------+
|[[value1, value2]...|
+--------------------+
scala> val data = parsedJson1.selectExpr("explode(parsed_json) as json").select("json.*")
data: org.apache.spark.sql.DataFrame = [key1: string, key2: string]
scala> data.show()
+------+------+
| key1| key2|
+------+------+
|value1|value2|
|value3|value4|
+------+------+
仅供参考,没有星形扩展,中间结果如下所示:
scala> val data = parsedJson1.selectExpr("explode(parsed_json) as json")
data: org.apache.spark.sql.DataFrame = [json: struct<key1: string, key2: string>]
scala> data.show()
+----------------+
| json|
+----------------+
|[value1, value2]|
|[value3, value4]|
+----------------+
- 您可以将 ArrayType 添加到您的架构中,然后 from_json 将数据转换为 json.
var schema = ArrayType(StructType(
Array(
StructField("key1", DataTypes.StringType),
StructField("key2", DataTypes.StringType)
)))
- 展开它得到每行中的 json 个数组元素。
val explodedDf = df.withColumn("jsonData", explode(from_json(col("value"), schema)))
.select($"jsonData").show
+----------------+
| jsonData|
+----------------+
|[value1, value2]|
|[value3, value4]|
+----------------+
- Select json 键
explodedDf.select("jsonData.*").show
+------+------+
| key1| key2|
+------+------+
|value1|value2|
|value3|value4|
+------+------+