在 Spark Streaming 中将 RDD 转换为 Dataframe Python

Convert RDD to Dataframe in Spark Streaming Python

我正在尝试将 RDD 转换为 Spark Streaming 中的 DataFrame。我正在遵循以下过程。

socket_stream = ssc.socketTextStream("localhost", 9999)
def convert_to_df(rdd):
    schema = StructType([StructField("text", StringType(), True)])
    df =spark.createDataFrame(rdd, schema = schema)
    df.show(10)

socket_stream.foreachRDD(convert_to_df)

我通过套接字提供输入nc -lk 9999

如果我输入 "hello world",它会显示以下错误

StructType can not accept object 'hello world' in type <class 'str'>

预期输出

+-------=-+
|text     |
+---------+
hello world
+---------+

尝试ArrayType(StringType())

否则,由于您只有一列,请尝试直接将架构指定为

df =spark.createDataFrame(rdd, StringType())

检查 pyspark 的 udf,因为您需要为 spark 声明一个 udf

由于您使用 RDD[str],您应该提供匹配类型。对于原子值,它是相应的 AtomicType

from pyspark.sql.types import StringType, StructField, StructType

rdd = sc.parallelize(["hello world"])
spark.createDataFrame(rdd, StringType())

或其字符串描述:

spark.createDataFrame(rdd, "string")

如果你想先用StructType :

schema = StructType([StructField("text", StringType(), True)])

spark.createDataFrame(rdd.map(lambda x: (x, )), schema)

当然,如果您只想将每个批次转换为 DataFrame,那么始终使用结构化流更有意义:

lines = (spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load())