在 Spark Streaming 中将 RDD 转换为 Dataframe Python
Convert RDD to Dataframe in Spark Streaming Python
我正在尝试将 RDD 转换为 Spark Streaming 中的 DataFrame。我正在遵循以下过程。
socket_stream = ssc.socketTextStream("localhost", 9999)
def convert_to_df(rdd):
schema = StructType([StructField("text", StringType(), True)])
df =spark.createDataFrame(rdd, schema = schema)
df.show(10)
socket_stream.foreachRDD(convert_to_df)
我通过套接字提供输入nc -lk 9999
如果我输入 "hello world",它会显示以下错误
StructType can not accept object 'hello world' in type <class 'str'>
预期输出
+-------=-+
|text |
+---------+
hello world
+---------+
尝试ArrayType(StringType())
否则,由于您只有一列,请尝试直接将架构指定为
df =spark.createDataFrame(rdd, StringType())
检查 pyspark 的 udf,因为您需要为 spark 声明一个 udf
由于您使用 RDD[str]
,您应该提供匹配类型。对于原子值,它是相应的 AtomicType
from pyspark.sql.types import StringType, StructField, StructType
rdd = sc.parallelize(["hello world"])
spark.createDataFrame(rdd, StringType())
或其字符串描述:
spark.createDataFrame(rdd, "string")
如果你想先用StructType
:
schema = StructType([StructField("text", StringType(), True)])
spark.createDataFrame(rdd.map(lambda x: (x, )), schema)
当然,如果您只想将每个批次转换为 DataFrame
,那么始终使用结构化流更有意义:
lines = (spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())
我正在尝试将 RDD 转换为 Spark Streaming 中的 DataFrame。我正在遵循以下过程。
socket_stream = ssc.socketTextStream("localhost", 9999)
def convert_to_df(rdd):
schema = StructType([StructField("text", StringType(), True)])
df =spark.createDataFrame(rdd, schema = schema)
df.show(10)
socket_stream.foreachRDD(convert_to_df)
我通过套接字提供输入nc -lk 9999
如果我输入 "hello world",它会显示以下错误
StructType can not accept object 'hello world' in type <class 'str'>
预期输出
+-------=-+
|text |
+---------+
hello world
+---------+
尝试ArrayType(StringType())
否则,由于您只有一列,请尝试直接将架构指定为
df =spark.createDataFrame(rdd, StringType())
检查 pyspark 的 udf,因为您需要为 spark 声明一个 udf
由于您使用 RDD[str]
,您应该提供匹配类型。对于原子值,它是相应的 AtomicType
from pyspark.sql.types import StringType, StructField, StructType
rdd = sc.parallelize(["hello world"])
spark.createDataFrame(rdd, StringType())
或其字符串描述:
spark.createDataFrame(rdd, "string")
如果你想先用StructType
schema = StructType([StructField("text", StringType(), True)])
spark.createDataFrame(rdd.map(lambda x: (x, )), schema)
当然,如果您只想将每个批次转换为 DataFrame
,那么始终使用结构化流更有意义:
lines = (spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())