使用 Spark 结构化流处理包含嵌套实体的 JSON
Processing JSON containing nested entities using Spark Structured Streaming
我想使用 Spark Structured Streaming 从 Kafka 主题源读取嵌套数据。
我的 Scala 代码(案例 类 和 Spark 处理代码):
case class Nested(attr_int: Integer, attr_string: String, attr_float: Float, attr_timestamp: java.sql.Timestamp)
case class Parent(a_str: String, a_long: Long, a_nested: Array[Nested])
import org.apache.spark.sql.Encoders
val jsonSchema = Encoders.product[Parent].schema
val df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testnested")
.option("group.id", "testnested")
.option("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
.load()
.select($"value" cast "string" as "json")
.select(from_json($"json", jsonSchema) as "data")
.select("data.*")
.withColumn("nested", explode($"a_nested"))
.select("nested.*")
.as[Nested]
.writeStream
.format("console")
.start()
.awaitTermination()
当我向 Kafka 发送数据时:
{"a_str":"Str","a_long":100,"a_nested":[{"attr_int":0,"attr_string":"nested_0","attr_float":0.0,"attr_timestamp":"2018-01-01T11:00:00.123321+02:00"},{"attr_int":1,"attr_string":"nested_1","attr_float":1.0,"attr_timestamp":"2018-02-02T12:01:01.023321+02:00"}]}
我得到结果:
+--------+-----------+----------+--------------------+
|attr_int|attr_string|attr_float| attr_timestamp|
+--------+-----------+----------+--------------------+
| 0| nested_0| 0.0|2018-01-01 13:02:...|
| 1| nested_1| 1.0|2018-02-02 14:01:...|
+--------+-----------+----------+--------------------+
现在我想要将每个嵌套项连接到父数据,f.e.:
+--------+-----------+----------+--------------------+-------+--------+
|attr_int|attr_string|attr_float| attr_timestamp| a_str | a_long |
+--------+-----------+----------+--------------------+-------+--------+
| 0| nested_0| 0.0|2018-01-01 13:02:...| Str | 100 |
| 1| nested_1| 1.0|2018-02-02 14:01:...| Str | 100 |
+--------+-----------+----------+--------------------+-------+--------+
请注意,"a_str"
和 "a_long"
是来自父实体 "Parent"
的列。
由于我不是Spark Structured Streams处理方面的专家,我想知道最"idiomatic"的方法是什么?
目前我有假设:
- 创建自定义 Kafka 值反序列化器
- 在结构化流上编写某种连接(我一直坚持),但我想这将需要更改 json 结构(f.e。在嵌套中指定一些键值
指向父数据)
- 编写自定义方法,该方法将 return 连接实体的非规范化数据并使用
flatMap
与此方法
请指教
谢谢
更新 1:为了您的方便,我在 GitHub 上创建了相应的项目:https://github.com/lospejos/spark-nested-classes-from-json
感谢 Glennie Helles Sindholt 以及其他 google 员工的帮助:
.select($"nested.*", $"a_str", $"a_long")
Github 存储库也已更新。
我想使用 Spark Structured Streaming 从 Kafka 主题源读取嵌套数据。 我的 Scala 代码(案例 类 和 Spark 处理代码):
case class Nested(attr_int: Integer, attr_string: String, attr_float: Float, attr_timestamp: java.sql.Timestamp)
case class Parent(a_str: String, a_long: Long, a_nested: Array[Nested])
import org.apache.spark.sql.Encoders
val jsonSchema = Encoders.product[Parent].schema
val df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testnested")
.option("group.id", "testnested")
.option("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
.load()
.select($"value" cast "string" as "json")
.select(from_json($"json", jsonSchema) as "data")
.select("data.*")
.withColumn("nested", explode($"a_nested"))
.select("nested.*")
.as[Nested]
.writeStream
.format("console")
.start()
.awaitTermination()
当我向 Kafka 发送数据时:
{"a_str":"Str","a_long":100,"a_nested":[{"attr_int":0,"attr_string":"nested_0","attr_float":0.0,"attr_timestamp":"2018-01-01T11:00:00.123321+02:00"},{"attr_int":1,"attr_string":"nested_1","attr_float":1.0,"attr_timestamp":"2018-02-02T12:01:01.023321+02:00"}]}
我得到结果:
+--------+-----------+----------+--------------------+
|attr_int|attr_string|attr_float| attr_timestamp|
+--------+-----------+----------+--------------------+
| 0| nested_0| 0.0|2018-01-01 13:02:...|
| 1| nested_1| 1.0|2018-02-02 14:01:...|
+--------+-----------+----------+--------------------+
现在我想要将每个嵌套项连接到父数据,f.e.:
+--------+-----------+----------+--------------------+-------+--------+
|attr_int|attr_string|attr_float| attr_timestamp| a_str | a_long |
+--------+-----------+----------+--------------------+-------+--------+
| 0| nested_0| 0.0|2018-01-01 13:02:...| Str | 100 |
| 1| nested_1| 1.0|2018-02-02 14:01:...| Str | 100 |
+--------+-----------+----------+--------------------+-------+--------+
请注意,"a_str"
和 "a_long"
是来自父实体 "Parent"
的列。
由于我不是Spark Structured Streams处理方面的专家,我想知道最"idiomatic"的方法是什么?
目前我有假设:
- 创建自定义 Kafka 值反序列化器
- 在结构化流上编写某种连接(我一直坚持),但我想这将需要更改 json 结构(f.e。在嵌套中指定一些键值 指向父数据)
- 编写自定义方法,该方法将 return 连接实体的非规范化数据并使用
flatMap
与此方法
请指教
谢谢
更新 1:为了您的方便,我在 GitHub 上创建了相应的项目:https://github.com/lospejos/spark-nested-classes-from-json
感谢 Glennie Helles Sindholt 以及其他 google 员工的帮助:
.select($"nested.*", $"a_str", $"a_long")
Github 存储库也已更新。