将 Spark Stream 从 Socket 转换为 DataFrame

Turn Spark Stream from Socket into DataFrame

我与我的 SparkSession 建立了套接字连接,将一行 .csv 文件发送到我的流。

到目前为止,我的 (PySpark-) 代码如下所示:

stream = spark.readStream.format('socket').option('host', 'localhost').option('port', 5555).load()

stream.writeStream.format('console').start().awaitTermination()

这会像这样在一列中打印 .csv 文件的行:

+-----------------+
|            value|
+-----------------+
|[2, C4653, C5030]|
+-----------------+

但我真正想要的是:

+-----+-----+-----+
| col1| col2| col3|
+-----+-----+-----+
|    2|C4653|C5030|
+-----+-----+-----+

我想将其用作 DataFrame 以提供 ML 管道。

如何处理传入的流数据?

您已经有一个数据框stream,只需要更改架构。

只需在 load() 调用后添加此转换:

 stream.selectExpr("split(value, ' ')[0] as col1","split(value, ' ')[1] as col2", "split(value, ' ')[2] as col3")