Spark Streaming MQTT - 在数据集上应用模式
Spark Streaming MQTT - Apply schema on dataset
我正在研究 DataBricks (Spark 2.0.1-db1 (Scala 2.11)),我正在尝试使用 Spark Streaming 函数。我正在使用库 :
- spark-sql-streaming-mqtt_2.11-2.1.0-SNAPSHOT.jar(参见此处:http://bahir.apache.org/docs/spark/current/spark-sql-streaming-mqtt/)
以下命令为我提供了一个数据集:
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("clientId", "sparkTest")
.option("brokerUrl", "tcp://xxx.xxx.xxx.xxx:xxx")
.option("topic", "/Name/data")
.option("localStorage", "dbfs:/models/mqttPersist")
.option("cleanSession", "true")
.load().as[(String, Timestamp)]
使用此 printSchema :
root
|-- value : string (nullable : true)
|-- timestamp : timestamp (nullable : true)
我想在我的数据集的 "value" 列上应用模式。您可以看到我的 json 架构如下。
root
|-- id : string (nullable = true)
|-- DateTime : timestamp (nullable = true)
|-- label : double (nullable = true)
是否可以在流中直接解析我的 json 以获得类似的东西:
root
|-- value : struct (nullable : true)
|-- id : string (nullable = true)
|-- DateTime : timestamp (nullable = true)
|-- label : double (nullable = true)
|-- timestamp : timestamp (nullable : true)
目前,我看不出有任何方法可以从 mqtt 解析 json,任何帮助都会非常有用。
提前致谢。
我今天遇到了同样的问题!我使用 json4s 和 Jackson 来解析 json。
我如何获得流数据集(与您拥有的几乎相同):
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.option("brokerUrl",brokerUrl)
.load().as[(String,Timestamp)]
我使用案例定义了架构 class:
case class DeviceData(devicename: String, time: Long, metric: String, value: Long, unit: String)
正在使用 org.json4s.jackson.JsonMethods.parse:
解析 JSON 列
val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[DeviceData]
}
正在输出结果:
val query = ds.writeStream
.format("console")
.option("truncate", false)
.start()
结果:
+----------+-------------+-----------+-----+----+
|devicename|time |metric |value|unit|
+----------+-------------+-----------+-----+----+
|dht11_4 |1486656575772|temperature|9 |C |
|dht11_4 |1486656575772|humidity |36 |% |
+----------+-------------+-----------+-----+----+
我有点失望,我无法提出使用 Sparks 本机 json 解析的解决方案。相反,我们必须依靠杰克逊。如果您将文件作为流读取,则可以使用 spark native json 解析。因此:
val lines = spark.readStream
.....
.json("./path/to/file").as[(String,Timestamp)]
但是对于 MQTT 我们不能这样做。
我正在研究 DataBricks (Spark 2.0.1-db1 (Scala 2.11)),我正在尝试使用 Spark Streaming 函数。我正在使用库 :
- spark-sql-streaming-mqtt_2.11-2.1.0-SNAPSHOT.jar(参见此处:http://bahir.apache.org/docs/spark/current/spark-sql-streaming-mqtt/)
以下命令为我提供了一个数据集:
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("clientId", "sparkTest")
.option("brokerUrl", "tcp://xxx.xxx.xxx.xxx:xxx")
.option("topic", "/Name/data")
.option("localStorage", "dbfs:/models/mqttPersist")
.option("cleanSession", "true")
.load().as[(String, Timestamp)]
使用此 printSchema :
root
|-- value : string (nullable : true)
|-- timestamp : timestamp (nullable : true)
我想在我的数据集的 "value" 列上应用模式。您可以看到我的 json 架构如下。
root
|-- id : string (nullable = true)
|-- DateTime : timestamp (nullable = true)
|-- label : double (nullable = true)
是否可以在流中直接解析我的 json 以获得类似的东西:
root
|-- value : struct (nullable : true)
|-- id : string (nullable = true)
|-- DateTime : timestamp (nullable = true)
|-- label : double (nullable = true)
|-- timestamp : timestamp (nullable : true)
目前,我看不出有任何方法可以从 mqtt 解析 json,任何帮助都会非常有用。
提前致谢。
我今天遇到了同样的问题!我使用 json4s 和 Jackson 来解析 json。
我如何获得流数据集(与您拥有的几乎相同):
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.option("brokerUrl",brokerUrl)
.load().as[(String,Timestamp)]
我使用案例定义了架构 class:
case class DeviceData(devicename: String, time: Long, metric: String, value: Long, unit: String)
正在使用 org.json4s.jackson.JsonMethods.parse:
解析 JSON 列val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[DeviceData]
}
正在输出结果:
val query = ds.writeStream
.format("console")
.option("truncate", false)
.start()
结果:
+----------+-------------+-----------+-----+----+
|devicename|time |metric |value|unit|
+----------+-------------+-----------+-----+----+
|dht11_4 |1486656575772|temperature|9 |C |
|dht11_4 |1486656575772|humidity |36 |% |
+----------+-------------+-----------+-----+----+
我有点失望,我无法提出使用 Sparks 本机 json 解析的解决方案。相反,我们必须依靠杰克逊。如果您将文件作为流读取,则可以使用 spark native json 解析。因此:
val lines = spark.readStream
.....
.json("./path/to/file").as[(String,Timestamp)]
但是对于 MQTT 我们不能这样做。