以样本形式流数据帧
take sample form streaming dataframe
我正在尝试将一个函数(适用于常规 spark 数据帧)应用到流数据。在应用此功能之前,我需要对给定数据使用 .rdd.takeSample() ,但这当然不适用于流数据帧。
我使用以下结构化流代码获取我的流数据:
dsraw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.99.100:9092") \
.option("subscribe", "topic") \
.option("startingOffsets", "earliest") \
.load()
ds = dsraw.selectExpr("CAST(value AS STRING)")
我的数据是一组随机数,形式为{'number': 1}等。理想情况下,我想将从该流中读取的所有数字放入一个数据帧中,并且return它。
有没有办法将流数据帧转换为 spark 数据帧或 rdd?如果没有,是否有 takeSample 的替代方法?
一种方法是将流数据写入内存,
然后使用 spark sql
:
创建 dataframe/rdd
dsraw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.99.100:9092") \
.option("subscribe", "topic") \
.option("startingOffsets", "earliest") \
.load()
ds = dsraw.selectExpr("CAST(value AS STRING)")
kafka_value_df = ds.selectExpr("CAST(value AS STRING)")
output_query = kafka_value_df.writeStream \
.queryName("numbers") \
.format("memory") \
.start()
output_query.awaitTermination(10)
value_df = spark.sql("select * from numbers") # df
value_rdd = value_df.rdd # rdd
我不知道你的原始数据到底是什么格式(只是 {'number': 1}
信息不够),你可能需要使用 map
或 json.loads
取决于你的数据获得 df/rdd.
所需的格式
我正在尝试将一个函数(适用于常规 spark 数据帧)应用到流数据。在应用此功能之前,我需要对给定数据使用 .rdd.takeSample() ,但这当然不适用于流数据帧。
我使用以下结构化流代码获取我的流数据:
dsraw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.99.100:9092") \
.option("subscribe", "topic") \
.option("startingOffsets", "earliest") \
.load()
ds = dsraw.selectExpr("CAST(value AS STRING)")
我的数据是一组随机数,形式为{'number': 1}等。理想情况下,我想将从该流中读取的所有数字放入一个数据帧中,并且return它。
有没有办法将流数据帧转换为 spark 数据帧或 rdd?如果没有,是否有 takeSample 的替代方法?
一种方法是将流数据写入内存,
然后使用 spark sql
:
dsraw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.99.100:9092") \
.option("subscribe", "topic") \
.option("startingOffsets", "earliest") \
.load()
ds = dsraw.selectExpr("CAST(value AS STRING)")
kafka_value_df = ds.selectExpr("CAST(value AS STRING)")
output_query = kafka_value_df.writeStream \
.queryName("numbers") \
.format("memory") \
.start()
output_query.awaitTermination(10)
value_df = spark.sql("select * from numbers") # df
value_rdd = value_df.rdd # rdd
我不知道你的原始数据到底是什么格式(只是 {'number': 1}
信息不够),你可能需要使用 map
或 json.loads
取决于你的数据获得 df/rdd.