如何将 Spark 流输出转换为数据帧或存储在 table 中
How to convert spark streaming output into dataframe or storing in table
我的代码是:
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))
val data=lines.map(_._2)
data.print()
我的输出有 50 个不同的值,格式如下
{"id:st04","data:26-02-2018 20:30:40","temp:30", "press:20"}
任何人都可以帮助我将这些数据以 table 形式存储为
| id |date |temp|press|
|st01|26-02-2018 20:30:40| 30 |20 |
|st01|26-02-2018 20:30:45| 80 |70 |
非常感谢。
您可以使用 foreachRDD 函数,连同普通数据集 API:
data.foreachRDD(rdd => {
// rdd is RDD[String]
// foreachRDD is executed on the driver, so you can use SparkSession here; spark is SparkSession, for Spark 1.x use SQLContext
val df = spark.read.json(rdd); // or sqlContext.read.json(rdd)
df.show();
df.write.saveAsTable("here some unique table ID");
});
但是,如果您使用 Spark 2.x,我建议您使用 Structured Streaming:
val stream = spark.readStream.format("kafka").load()
val data = stream
.selectExpr("cast(value as string) as value")
.select(from_json(col("value"), schema))
data.writeStream.format("console").start();
您必须手动指定架构,但这很简单:) 在任何处理之前也导入 org.apache.spark.sql.functions._
我的代码是:
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))
val data=lines.map(_._2)
data.print()
我的输出有 50 个不同的值,格式如下
{"id:st04","data:26-02-2018 20:30:40","temp:30", "press:20"}
任何人都可以帮助我将这些数据以 table 形式存储为
| id |date |temp|press|
|st01|26-02-2018 20:30:40| 30 |20 |
|st01|26-02-2018 20:30:45| 80 |70 |
非常感谢。
您可以使用 foreachRDD 函数,连同普通数据集 API:
data.foreachRDD(rdd => {
// rdd is RDD[String]
// foreachRDD is executed on the driver, so you can use SparkSession here; spark is SparkSession, for Spark 1.x use SQLContext
val df = spark.read.json(rdd); // or sqlContext.read.json(rdd)
df.show();
df.write.saveAsTable("here some unique table ID");
});
但是,如果您使用 Spark 2.x,我建议您使用 Structured Streaming:
val stream = spark.readStream.format("kafka").load()
val data = stream
.selectExpr("cast(value as string) as value")
.select(from_json(col("value"), schema))
data.writeStream.format("console").start();
您必须手动指定架构,但这很简单:) 在任何处理之前也导入 org.apache.spark.sql.functions._