Structured Streaming如何动态解析kafka的json数据
How structured streaming dynamically parses kafka's json data
我正在尝试使用结构化流从 Kafka 读取数据。从 kafka 接收到的数据是 json 格式。
我的代码如下:
在代码中,我使用 from_json 函数将 json 转换为数据帧以供进一步处理。
val **schema**: StructType = new StructType()
.add("time", LongType)
.add(id", LongType)
.add("properties",new StructType()
.add("$app_version", StringType)
.
.
)
val df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","...")
.option("subscribe","...")
.load()
.selectExpr("CAST(value AS STRING) as value")
.select(from_json(col("value"), **schema**))
我的问题是,如果增加字段,
我无法停止 spark 程序来手动添加这些字段,
那么如何动态解析这些字段,我尝试了 schema_of_json(),
只能取第一行来推断字段类型,不适合多层嵌套结构json data.
My problem is that if the field is increased, I can't stop the spark program to manually add these fields, then how can I parse these fields dynamically
在 Spark Structured Streaming(甚至 Spark SQL)中开箱即用是不可能的。不过有几个解决方案。
更改代码中的架构并恢复流式查询
您只需停止流式查询,更改代码以匹配当前模式,然后恢复它。在 Spark Structured Streaming 中可以使用支持从检查点恢复的数据源。 Kafka数据源确实支持。
用户定义函数 (UDF)
您可以编写一个用户定义函数 (UDF) 来为您执行此动态 JSON 解析。这也是最简单的选择之一。
新数据源(MicroBatchReader)
另一种选择是创建内置 Kafka 数据源的扩展,该扩展将执行动态 JSON 解析(类似于 Kafka 反序列化器)。这需要更多的开发,但肯定是可行的。
我正在尝试使用结构化流从 Kafka 读取数据。从 kafka 接收到的数据是 json 格式。 我的代码如下: 在代码中,我使用 from_json 函数将 json 转换为数据帧以供进一步处理。
val **schema**: StructType = new StructType()
.add("time", LongType)
.add(id", LongType)
.add("properties",new StructType()
.add("$app_version", StringType)
.
.
)
val df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","...")
.option("subscribe","...")
.load()
.selectExpr("CAST(value AS STRING) as value")
.select(from_json(col("value"), **schema**))
我的问题是,如果增加字段, 我无法停止 spark 程序来手动添加这些字段, 那么如何动态解析这些字段,我尝试了 schema_of_json(), 只能取第一行来推断字段类型,不适合多层嵌套结构json data.
My problem is that if the field is increased, I can't stop the spark program to manually add these fields, then how can I parse these fields dynamically
在 Spark Structured Streaming(甚至 Spark SQL)中开箱即用是不可能的。不过有几个解决方案。
更改代码中的架构并恢复流式查询
您只需停止流式查询,更改代码以匹配当前模式,然后恢复它。在 Spark Structured Streaming 中可以使用支持从检查点恢复的数据源。 Kafka数据源确实支持。
用户定义函数 (UDF)
您可以编写一个用户定义函数 (UDF) 来为您执行此动态 JSON 解析。这也是最简单的选择之一。
新数据源(MicroBatchReader)
另一种选择是创建内置 Kafka 数据源的扩展,该扩展将执行动态 JSON 解析(类似于 Kafka 反序列化器)。这需要更多的开发,但肯定是可行的。