结构化流式传输和将嵌套数据拆分为多个数据集

Structured Streaming and Splitting nested data into multiple datasets

我正在使用 Spark 的结构化流 (2.2.1),使用 Kafka 每 60 秒从传感器接收一次数据。我在思考如何打包这个 Kafka 数据以便能够正确处理它时遇到了麻烦。

我需要能够进行一些计算,因为数据来自 Kafka。

我的问题是将来自 Kafka 的 JSON 数据解压缩到我可以使用的数据集中

数据

简化后的数据如下所示:

{
  id: 1,
  timestamp: "timestamp"
  pump: {
    current: 1.0,
    flow: 20.0
    torque: 5.0
  },
  reactors: [
    {
      id: 1,
      status: 200,
    },

    {
      id: 2,
      status: 300,
    }
  ],
  settings: {
    pumpTimer: 20.0,
    reactorStatusTimer: 200.0
  }
}

为了能够使用 this is Spark,我为其中的每一个创建了一些 case class 结构:

// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)

并使用以下方法生成架构:

val rawDataSchema = Encoders.product[RawData].schema

Spark Schema 的原始数据

首先,我将来自 Kafka 的 'value' 字段放入我的通用模式中:

val rawDataSet = df.select($"value" cast "string" as "json")
  .select(from_json($"json", rawDataSchema))
  .select("data.*").as[RawData]

使用这个 rawDataSet,我可以将每个单独的对象打包到数据集中。

val pump = rawDataSet.select(from_json($"pump", pumpSchema) as 'pumpData)
  .select("pumpData.*").as[Pump]

val settings = rawDataSet.select(from_json($"settings", settingsSchema) as 'settingsData)
  .select("settingsData.*").as[Settings]

这为我提供了每个 JSON 对象的漂亮干净的数据集。

处理数据

这是我的问题,例如,如果我想比较或计算设置和泵的两个数据集之间的某些值,则 JOIN 无法使用结构化流。

val joinedData = pump.join(settings)

错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;

难道我的做法全错了吗?或者是否有任何关于处理此问题的替代方法的建议?

谢谢

我将用我现在可用的解决方案回答我自己的问题

不是为 JSON 中的每个对象制作 case classes,我可以将它们作为一个 case class 与嵌套对象这样连接在一起:

case class RawData(
  id: String, 
  timestamp: String, 
  pump: Pump, 
  reactors: Array[Reactor], 
  settings: Settings
)

case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)

要将其变成可用的数据集,我可以简单地调用

val rawDataset = df.select($"value" cast "string" as "json")
  .select(from_json($"json", Encoders.product[RawData].schema) as 'data)
  .select("data.*").as[RawData]
  .withColumn("reactor", explode($"reactors")) // Handles the array of reactors, making one row in the dataset per reactor.

处理 JSON 并将其放入我的定义模式后,我可以 select 每个特定的传感器,如下所示:

val tester = rawDataset.select($"pump.current", $”settings.pumpTimer”)

谢谢你user6910411给我指明了正确的方向