如何将 Spark Streaming 数据转换为 Spark DataFrame
How to convert Spark Streaming data into Spark DataFrame
到目前为止,Spark还没有创建流式数据的DataFrame,但是我在做异常检测的时候,使用DataFrame进行数据分析更加方便快捷。我已经完成了这部分,但是当我尝试使用流数据进行实时异常检测时,问题出现了。试了好几种方法,仍然无法将DStream转为DataFrame,也无法将DStream内部的RDD转为DataFrame
这是我最新版本的代码的一部分:
import sys
import re
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator
sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)
model_inputs = sys.argv[1]
def streamrdd_to_df(srdd):
sdf = sqlContext.createDataFrame(srdd)
sdf.show(n=2, truncate=False)
return sdf
def main():
indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
inrdd = indata.map(lambda r: get_tuple(r))
Features = Row('rawFeatures')
features_rdd = inrdd.map(lambda r: Features(r))
features_rdd.pprint(num=3)
streaming_df = features_rdd.flatMap(streamrdd_to_df)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
正如你在 main() 函数中看到的,当我使用 ssc.socketTextStream() 方法读取输入流数据时,它会生成 DStream,然后我尝试将 DStream 中的每个个体转换为 Row,希望以后能把数据转换成DataFrame。
如果我在这里使用 ppprint() 打印出 features_rdd,它可以工作,这让我想到,features_rdd 中的每个个体都是一批 RDD,而整个 features_rdd是一个DStream。
然后我创建了 streamrdd_to_df() 方法并希望将每批 RDD 转换为数据帧,它给我错误,显示:
ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
有没有想过如何对 Spark 流数据进行 DataFrame 操作?
仔细阅读错误。它说没有注册输出操作。 Spark 是惰性的,只有当它有结果时才执行作业/代码。在你的程序中没有 "Output Operation" 并且 Spark 也抱怨同样的事情。
定义一个 foreach() 或 Raw SQL 对 DataFrame 的查询,然后打印结果。它会很好地工作。
Spark 为我们提供了structured streaming 可以解决此类问题。它可以生成流式数据帧,即连续附加的数据帧。请查看下方 link
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
你为什么不用这样的东西:
def socket_streamer(sc): # retruns a streamed dataframe
streamer = session.readStream\
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
return streamer
上面这个函数的输出本身(或者一般的 readStream
)是一个 DataFrame。那里你不需要担心 df,它已经由 spark 自动创建。
见 Spark Structured Streaming Programming Guide
经过1年时间,我开始探索Spark 2.0的流式处理方法,终于解决了我的异常检测问题。 Here's my code in IPython, you can also find how does my raw data input look like
使用 Spark 2.3 / Python 3 / Scala 2.11(使用数据块)我能够在 Scala 中使用临时表和代码片段(在笔记本中使用魔法):
Python 部分:
ddf.createOrReplaceTempView("TempItems")
然后在一个新单元格上:
%scala
import java.sql.DriverManager
import org.apache.spark.sql.ForeachWriter
// Create the query to be persisted...
val tempItemsDF = spark.sql("SELECT field1, field2, field3 FROM TempItems")
val itemsQuery = tempItemsDF.writeStream.foreach(new ForeachWriter[Row]
{
def open(partitionId: Long, version: Long):Boolean = {
// Initializing DB connection / etc...
}
def process(value: Row): Unit = {
val field1 = value(0)
val field2 = value(1)
val field3 = value(2)
// Processing values ...
}
def close(errorOrNull: Throwable): Unit = {
// Closing connections etc...
}
})
val streamingQuery = itemsQuery.start()
不需要将DStream转为RDD。根据定义,DStream 是 RDD 的集合。只需使用 DStream 的方法 foreach() 遍历每个 RDD 并采取行动。
val conf = new SparkConf()
.setAppName("Sample")
val spark = SparkSession.builder.config(conf).getOrCreate()
sampleStream.foreachRDD(rdd => {
val sampleDataFrame = spark.read.json(rdd)
}
spark documentation 介绍了如何使用 DStream。基本上,您必须在流对象上使用 foreachRDD
才能与之交互。
这是一个示例(确保您创建了一个 spark 会话对象):
def process_stream(record, spark):
if not record.isEmpty():
df = spark.createDataFrame(record)
df.show()
def main():
sc = SparkContext(appName="PysparkStreaming")
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)
dstream = ssc.textFileStream(folder_path)
transformed_dstream = # transformations
transformed_dstream.foreachRDD(lambda rdd: process_stream(rdd, spark))
# ^^^^^^^^^^
ssc.start()
ssc.awaitTermination()
到目前为止,Spark还没有创建流式数据的DataFrame,但是我在做异常检测的时候,使用DataFrame进行数据分析更加方便快捷。我已经完成了这部分,但是当我尝试使用流数据进行实时异常检测时,问题出现了。试了好几种方法,仍然无法将DStream转为DataFrame,也无法将DStream内部的RDD转为DataFrame
这是我最新版本的代码的一部分:
import sys
import re
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator
sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)
model_inputs = sys.argv[1]
def streamrdd_to_df(srdd):
sdf = sqlContext.createDataFrame(srdd)
sdf.show(n=2, truncate=False)
return sdf
def main():
indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
inrdd = indata.map(lambda r: get_tuple(r))
Features = Row('rawFeatures')
features_rdd = inrdd.map(lambda r: Features(r))
features_rdd.pprint(num=3)
streaming_df = features_rdd.flatMap(streamrdd_to_df)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
正如你在 main() 函数中看到的,当我使用 ssc.socketTextStream() 方法读取输入流数据时,它会生成 DStream,然后我尝试将 DStream 中的每个个体转换为 Row,希望以后能把数据转换成DataFrame。
如果我在这里使用 ppprint() 打印出 features_rdd,它可以工作,这让我想到,features_rdd 中的每个个体都是一批 RDD,而整个 features_rdd是一个DStream。
然后我创建了 streamrdd_to_df() 方法并希望将每批 RDD 转换为数据帧,它给我错误,显示:
ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
有没有想过如何对 Spark 流数据进行 DataFrame 操作?
仔细阅读错误。它说没有注册输出操作。 Spark 是惰性的,只有当它有结果时才执行作业/代码。在你的程序中没有 "Output Operation" 并且 Spark 也抱怨同样的事情。
定义一个 foreach() 或 Raw SQL 对 DataFrame 的查询,然后打印结果。它会很好地工作。
Spark 为我们提供了structured streaming 可以解决此类问题。它可以生成流式数据帧,即连续附加的数据帧。请查看下方 link
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
你为什么不用这样的东西:
def socket_streamer(sc): # retruns a streamed dataframe
streamer = session.readStream\
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
return streamer
上面这个函数的输出本身(或者一般的 readStream
)是一个 DataFrame。那里你不需要担心 df,它已经由 spark 自动创建。
见 Spark Structured Streaming Programming Guide
经过1年时间,我开始探索Spark 2.0的流式处理方法,终于解决了我的异常检测问题。 Here's my code in IPython, you can also find how does my raw data input look like
使用 Spark 2.3 / Python 3 / Scala 2.11(使用数据块)我能够在 Scala 中使用临时表和代码片段(在笔记本中使用魔法):
Python 部分:
ddf.createOrReplaceTempView("TempItems")
然后在一个新单元格上:
%scala
import java.sql.DriverManager
import org.apache.spark.sql.ForeachWriter
// Create the query to be persisted...
val tempItemsDF = spark.sql("SELECT field1, field2, field3 FROM TempItems")
val itemsQuery = tempItemsDF.writeStream.foreach(new ForeachWriter[Row]
{
def open(partitionId: Long, version: Long):Boolean = {
// Initializing DB connection / etc...
}
def process(value: Row): Unit = {
val field1 = value(0)
val field2 = value(1)
val field3 = value(2)
// Processing values ...
}
def close(errorOrNull: Throwable): Unit = {
// Closing connections etc...
}
})
val streamingQuery = itemsQuery.start()
不需要将DStream转为RDD。根据定义,DStream 是 RDD 的集合。只需使用 DStream 的方法 foreach() 遍历每个 RDD 并采取行动。
val conf = new SparkConf()
.setAppName("Sample")
val spark = SparkSession.builder.config(conf).getOrCreate()
sampleStream.foreachRDD(rdd => {
val sampleDataFrame = spark.read.json(rdd)
}
spark documentation 介绍了如何使用 DStream。基本上,您必须在流对象上使用 foreachRDD
才能与之交互。
这是一个示例(确保您创建了一个 spark 会话对象):
def process_stream(record, spark):
if not record.isEmpty():
df = spark.createDataFrame(record)
df.show()
def main():
sc = SparkContext(appName="PysparkStreaming")
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)
dstream = ssc.textFileStream(folder_path)
transformed_dstream = # transformations
transformed_dstream.foreachRDD(lambda rdd: process_stream(rdd, spark))
# ^^^^^^^^^^
ssc.start()
ssc.awaitTermination()