Structured Streaming如何动态解析kafka的json数据

How structured streaming dynamically parses kafka's json data

我正在尝试使用结构化流从 Kafka 读取数据。从 kafka 接收到的数据是 json 格式。 我的代码如下: 在代码中,我使用 from_json 函数将 json 转换为数据帧以供进一步处理。

val **schema**: StructType = new StructType()
    .add("time", LongType)
    .add(id", LongType)
    .add("properties",new StructType()
      .add("$app_version", StringType)
      .
      .
    )
val df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","...")
    .option("subscribe","...")
    .load()
    .selectExpr("CAST(value AS STRING) as value")
    .select(from_json(col("value"), **schema**))

我的问题是,如果增加字段, 我无法停止 spark 程序来手动添加这些字段, 那么如何动态解析这些字段,我尝试了 schema_of_json(), 只能取第一行来推断字段类型,不适合多层嵌套结构json data.

My problem is that if the field is increased, I can't stop the spark program to manually add these fields, then how can I parse these fields dynamically

在 Spark Structured Streaming(甚至 Spark SQL)中开箱即用是不可能的。不过有几个解决方案。

更改代码中的架构并恢复流式查询

您只需停止流式查询,更改代码以匹配当前模式,然后恢复它。在 Spark Structured Streaming 中可以使用支持从检查点恢复的数据源。 Kafka数据源确实支持。

用户定义函数 (UDF)

您可以编写一个用户定义函数 (UDF) 来为您执行此动态 JSON 解析。这也是最简单的选择之一。

新数据源(MicroBatchReader)

另一种选择是创建内置 Kafka 数据源的扩展,该扩展将执行动态 JSON 解析(类似于 Kafka 反序列化器)。这需要更多的开发,但肯定是可行的。