Json 字符串应作为 Kafka 主题使用,在 spark 结构化流中没有模式

Json string should be consumed as Kafka topic without schema in spark structured streaming

我需要使用 Kafka 主题,它为每个 row.I 生成动态 Json 字符串,无法解析没有模式的 Json 字符串。就我而言,架构可以是动态的。 spark.read.json 可以推断 json schema.But 它需要“数据集”或“JSON 文件”。

有什么方法可以将 Kafka 主题(值)转换为数据集?这样我就可以使用 spark.read.json,它接受数据集作为输入,它可以解析 json.

的模式

但是如果我使用下面的代码。

val klines = spark.
  readStream.
  format("kafka").
  option("kafka.bootstrap.servers", "host1:port1,host2:port2").
  option("subscribe", "topic").
  load().
  select($"value".cast("string").alias("value"))

val query = klines.
  select(from_json($"value",schema=spark.read.json(klines.as[String]).schema)).
  writeStream.
  format("console").
  start()

query.awaitTermination()

出现以下错误: 线程“main”中的异常org.apache.spark.sql.AnalysisException:必须使用 writeStream.start(); 执行流式源查询; 卡夫卡

我正在做一些中间计算,例如展平架构。但如果我这样做,就会发生同样的错误。我如何处理 spark structured streaming(scala) 中的基本中间计算?

JSON 是一个字符串。你可以只是一个字符串类型的模式。

So that i can use spark.read.json

spark.read.json 来自 文件系统

如果您想从 Kafka 读取数据,您可能需要 spark.readStream.format("kafka"),Spark 文档中对此进行了足够详细的描述

Spark 文档中的第一个示例正是这样做的

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

您在对数据进行任何类型的有用分析时都会遇到问题,但是考虑到每条记录都有可能不共享相同的字段,所以像 get_json_object 这样的操作将毫无意义

你可以说最好使用原始 Kafka 消费者 API 或 KStreams,它们不需要任何模式,但是你的问题是 不是 模式 -- 它是 反序列化 到具有可以查询的具体字段的对象类型