使用 Spark Structured Streaming 读取带有模式的 Kafka Connect JSONConverter 消息

Reading Kafka Connect JSONConverter messages with schema using Spark Structured Streaming

我正在尝试读取来自 Kafka Topic 的消息。消息格式如下(示例格式):

{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}

此外,请注意主题有来自不同 table 的消息,而不仅仅是 1 table。

我想要实现的是使用 Spark Structured Streaming 从 Kafka Topic 读取上面的消息,并创建一个数据框,其列名及其值均来自 JSON 消息本身。

我不想使用 case class 或 StructType 显式定义模式。

我试过这个:

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()

val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")

当我查看 Y(这是一个数据帧)时,它显示为 1 列,有效负载下的值为该列中的 JSON。

如何获取数据框中的单个列?我没有做到这一点。

(再次重申,我不能对模式部分使用通用案例 class 或 StructType,因为通过 Kafka 消息的消息来自不同的 table,所以我想要更多从 JSON 本身在 运行.)

选项 1:更改 Kafka Connect 源以设置 value.converter.schemas.enable=false。这只会给你(开始时未包装的有效载荷),然后你可以跳到下面 post。

否则,在剥离 Connect 架构后,您需要使用 from_json() 来应用架构

val y = df.select(get_json_object($"value", "$.payload").alias("payload"))
val z = df.select(from_json($"payload", schema))

你所有的字段都是字符串,所以看起来像

val schema: StructType = StructType(Seq(
  StructField("emp_id", StringType()),
  StructField("emp_name", StringType()),
  StructField("city", StringType()),
  StructField("emp_sal", StringType()),
  StructField("manager_name", StringType())
))

相关