使用套接字的 Spark Structured Streaming,设置 SCHEMA,在控制台中显示 DATAFRAME
Spark Structured Streaming using sockets, set SCHEMA, Display DATAFRAME in console
如何在 PySpark 中为流 DataFrame
设置架构。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
.readStream\
.format('socket')\
.option('host', '192.168.0.113')\
.option('port', 5560)\
.load()
例如我需要一个 table 比如:
Name, lastName, PhoneNumber
Bob, Dylan, 123456
Jack, Ma, 789456
....
如何将 header/schema 设置为 ['Name','lastName','PhoneNumber']
与他们的数据类型。
此外,是否可以连续显示此 table,或者说 DataFrame
的前 20 行。当我尝试它时,我收到错误
"pyspark.sql.utils.AnalysisException: 'Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;\nProject"
TextSocketSource
不提供任何集成的解析选项。只能使用以下两种格式之一:
时间戳和文本,如果 includeTimestamp
设置为 true
,架构如下:
StructType([
StructField("value", StringType()),
StructField("timestamp", TimestampType())
])
仅当 includeTimestamp
设置为 false
且架构如下所示时才显示文本:
StructType([StructField("value", StringType())]))
如果您想更改此格式,则必须转换流以提取感兴趣的字段,例如使用正则表达式:
from pyspark.sql.functions import regexp_extract
from functools import partial
fields = partial(
regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
)
lines.select(
fields(idx=1).alias("name"),
fields(idx=2).alias("last_name"),
fields(idx=3).alias("phone_number")
)
如何在 PySpark 中为流 DataFrame
设置架构。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
.readStream\
.format('socket')\
.option('host', '192.168.0.113')\
.option('port', 5560)\
.load()
例如我需要一个 table 比如:
Name, lastName, PhoneNumber
Bob, Dylan, 123456
Jack, Ma, 789456
....
如何将 header/schema 设置为 ['Name','lastName','PhoneNumber'] 与他们的数据类型。
此外,是否可以连续显示此 table,或者说 DataFrame
的前 20 行。当我尝试它时,我收到错误
"pyspark.sql.utils.AnalysisException: 'Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;\nProject"
TextSocketSource
不提供任何集成的解析选项。只能使用以下两种格式之一:
时间戳和文本,如果
includeTimestamp
设置为true
,架构如下:StructType([ StructField("value", StringType()), StructField("timestamp", TimestampType()) ])
仅当
includeTimestamp
设置为false
且架构如下所示时才显示文本:StructType([StructField("value", StringType())]))
如果您想更改此格式,则必须转换流以提取感兴趣的字段,例如使用正则表达式:
from pyspark.sql.functions import regexp_extract
from functools import partial
fields = partial(
regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
)
lines.select(
fields(idx=1).alias("name"),
fields(idx=2).alias("last_name"),
fields(idx=3).alias("phone_number")
)