如何在 Java 的 spark streaming 中解析复杂的 JSON 数据
How to parse complex JSON Data in spark streaming in Java
我们正在开发物联网应用程序
我们从每个要运行分析的设备中获取以下数据流,
[{"t":1481368346000,"sensors":[{"s":"s1","d":"+149.625"},{"s":"s2","d":"+23.062"},{"s":"s3","d":"+16.375"},{"s":"s4","d":"+235.937"},{"s":"s5","d":"+271.437"},{"s":"s6","d":"+265.937"},{"s":"s7","d":"+295.562"},{"s":"s8","d":"+301.687"}]}]
在初级阶段,我可以使用 spark java 代码获取架构,如下所示,
root
|-- sensors: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- d: string (nullable = true)
| | |-- s: string (nullable = true)
|-- t: long (nullable = true)
我写的代码是,
JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
public String call(Tuple2<String,String> message) throws Exception {
return message._2();
};
});
SQLContext sqlContext = spark.sqlContext();
json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> jsonRecord) throws Exception {
Dataset<Row> row = sqlContext.read().json(jsonRecord).toDF();
row.createOrReplaceTempView("MyTable");
row.printSchema();
row.show();
Dataset<Row> sensors = row.select("sensors");
sensors.createOrReplaceTempView("sensors");
sensors.printSchema();
sensors.show();
}
});
这给了我错误 "org.apache.spark.sql.AnalysisException: cannot resolve 'sensors
' given input columns: [];"
我是 spark 和分析的初学者,无法在 java 中找到任何用于解析嵌套 json 的好例子。
我正在努力实现的是并且可能需要专家的建议是,
我将提取每个传感器值,然后使用 spark 的 sparkML 库进行 运行 回归分析。这将帮助我找出每个传感器流中发生的趋势,以及我想使用该数据检测故障。
我不确定哪种方法最好,任何指导、链接和信息都会很有帮助。
这是您的 json.foreachRDD
的样子。
json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
if(!rdd.isEmpty()){
Dataset<Row> data = spark.read().json(rdd).select("sensors");
data.printSchema();
data.show(false);
//DF in table
Dataset<Row> df = data.select( org.apache.spark.sql.functions.explode(org.apache.spark.sql.functions.col("sensors"))).toDF("sensors").select("sensors.s","sensors.d");
df.show(false);
}
}
});
回归分析样本,可参考JavaRandomForestRegressorExample.java
at https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestRegressorExample.java
使用spark machine learning和spark streaming进行实时数据分析,可以参考以下文章。
我们正在开发物联网应用程序
我们从每个要运行分析的设备中获取以下数据流,
[{"t":1481368346000,"sensors":[{"s":"s1","d":"+149.625"},{"s":"s2","d":"+23.062"},{"s":"s3","d":"+16.375"},{"s":"s4","d":"+235.937"},{"s":"s5","d":"+271.437"},{"s":"s6","d":"+265.937"},{"s":"s7","d":"+295.562"},{"s":"s8","d":"+301.687"}]}]
在初级阶段,我可以使用 spark java 代码获取架构,如下所示,
root
|-- sensors: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- d: string (nullable = true)
| | |-- s: string (nullable = true)
|-- t: long (nullable = true)
我写的代码是,
JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
public String call(Tuple2<String,String> message) throws Exception {
return message._2();
};
});
SQLContext sqlContext = spark.sqlContext();
json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> jsonRecord) throws Exception {
Dataset<Row> row = sqlContext.read().json(jsonRecord).toDF();
row.createOrReplaceTempView("MyTable");
row.printSchema();
row.show();
Dataset<Row> sensors = row.select("sensors");
sensors.createOrReplaceTempView("sensors");
sensors.printSchema();
sensors.show();
}
});
这给了我错误 "org.apache.spark.sql.AnalysisException: cannot resolve 'sensors
' given input columns: [];"
我是 spark 和分析的初学者,无法在 java 中找到任何用于解析嵌套 json 的好例子。
我正在努力实现的是并且可能需要专家的建议是,
我将提取每个传感器值,然后使用 spark 的 sparkML 库进行 运行 回归分析。这将帮助我找出每个传感器流中发生的趋势,以及我想使用该数据检测故障。
我不确定哪种方法最好,任何指导、链接和信息都会很有帮助。
这是您的 json.foreachRDD
的样子。
json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
if(!rdd.isEmpty()){
Dataset<Row> data = spark.read().json(rdd).select("sensors");
data.printSchema();
data.show(false);
//DF in table
Dataset<Row> df = data.select( org.apache.spark.sql.functions.explode(org.apache.spark.sql.functions.col("sensors"))).toDF("sensors").select("sensors.s","sensors.d");
df.show(false);
}
}
});
回归分析样本,可参考JavaRandomForestRegressorExample.java
at https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestRegressorExample.java
使用spark machine learning和spark streaming进行实时数据分析,可以参考以下文章。