将 Spark Stream 从 Socket 转换为 DataFrame
Turn Spark Stream from Socket into DataFrame
我与我的 SparkSession 建立了套接字连接,将一行 .csv 文件发送到我的流。
到目前为止,我的 (PySpark-) 代码如下所示:
stream = spark.readStream.format('socket').option('host', 'localhost').option('port', 5555).load()
stream.writeStream.format('console').start().awaitTermination()
这会像这样在一列中打印 .csv 文件的行:
+-----------------+
| value|
+-----------------+
|[2, C4653, C5030]|
+-----------------+
但我真正想要的是:
+-----+-----+-----+
| col1| col2| col3|
+-----+-----+-----+
| 2|C4653|C5030|
+-----+-----+-----+
我想将其用作 DataFrame 以提供 ML 管道。
如何处理传入的流数据?
您已经有一个数据框stream,只需要更改架构。
只需在 load() 调用后添加此转换:
stream.selectExpr("split(value, ' ')[0] as col1","split(value, ' ')[1] as col2", "split(value, ' ')[2] as col3")
我与我的 SparkSession 建立了套接字连接,将一行 .csv 文件发送到我的流。
到目前为止,我的 (PySpark-) 代码如下所示:
stream = spark.readStream.format('socket').option('host', 'localhost').option('port', 5555).load()
stream.writeStream.format('console').start().awaitTermination()
这会像这样在一列中打印 .csv 文件的行:
+-----------------+
| value|
+-----------------+
|[2, C4653, C5030]|
+-----------------+
但我真正想要的是:
+-----+-----+-----+
| col1| col2| col3|
+-----+-----+-----+
| 2|C4653|C5030|
+-----+-----+-----+
我想将其用作 DataFrame 以提供 ML 管道。
如何处理传入的流数据?
您已经有一个数据框stream,只需要更改架构。
只需在 load() 调用后添加此转换:
stream.selectExpr("split(value, ' ')[0] as col1","split(value, ' ')[1] as col2", "split(value, ' ')[2] as col3")