如何使用 PySpark 转换结构化流?
How to transform structured streams with PySpark?
这看起来应该是显而易见的,但在查看文档和示例时,我不确定我能否找到一种方法来获取结构化流并使用 PySpark 进行转换。
例如:
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName('StreamingWordCount')
.getOrCreate()
)
raw_records = (
spark
.readStream
.format('socket')
.option('host', 'localhost')
.option('port', 9999)
.load()
)
# I realize there's a SQL function for upper-case, just illustrating a sample
# use of an arbitrary map function
records = raw_records.rdd.map(lambda w: w.upper()).toDF()
counts = (
records
.groupBy(records.value)
.count()
)
query = (
counts
.writeStream
.outputMode('complete')
.format('console')
.start()
)
query.awaitTermination()
这将抛出以下异常:
Queries with streaming sources must be executed with writeStream.start
但是,如果我删除对 rdd.map(...).toDF()
的调用,一切似乎都正常。
似乎对 rdd.map
的调用从流上下文分支执行并导致 Spark 警告它从未启动?
是否有 "right" 方法使用结构化流和 PySpark 应用 map
或 mapPartition
样式转换?
Structured Streaming 中应用的每个转换都必须完全包含在 Dataset
世界中 - 对于 PySpark,这意味着您只能使用 DataFrame
或 SQL 并转换为不支持 RDD
(或 DStream
或本地集合)。
如果您想使用纯 Python 代码,您必须使用 UserDefinedFunction
。
from pyspark.sql.functions import udf
@udf
def to_upper(s)
return s.upper()
raw_records.select(to_upper("value"))
另见
特定列 (column_name) 的另一种方法:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def to_uper(string):
return string.upper()
to_upper_udf = udf(to_upper,StringType())
records = raw_records.withColumn("new_column_name"
,to_upper_udf(raw_records['column_name']))\
.drop("column_name")
这看起来应该是显而易见的,但在查看文档和示例时,我不确定我能否找到一种方法来获取结构化流并使用 PySpark 进行转换。
例如:
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName('StreamingWordCount')
.getOrCreate()
)
raw_records = (
spark
.readStream
.format('socket')
.option('host', 'localhost')
.option('port', 9999)
.load()
)
# I realize there's a SQL function for upper-case, just illustrating a sample
# use of an arbitrary map function
records = raw_records.rdd.map(lambda w: w.upper()).toDF()
counts = (
records
.groupBy(records.value)
.count()
)
query = (
counts
.writeStream
.outputMode('complete')
.format('console')
.start()
)
query.awaitTermination()
这将抛出以下异常:
Queries with streaming sources must be executed with writeStream.start
但是,如果我删除对 rdd.map(...).toDF()
的调用,一切似乎都正常。
似乎对 rdd.map
的调用从流上下文分支执行并导致 Spark 警告它从未启动?
是否有 "right" 方法使用结构化流和 PySpark 应用 map
或 mapPartition
样式转换?
Structured Streaming 中应用的每个转换都必须完全包含在 Dataset
世界中 - 对于 PySpark,这意味着您只能使用 DataFrame
或 SQL 并转换为不支持 RDD
(或 DStream
或本地集合)。
如果您想使用纯 Python 代码,您必须使用 UserDefinedFunction
。
from pyspark.sql.functions import udf
@udf
def to_upper(s)
return s.upper()
raw_records.select(to_upper("value"))
另见
特定列 (column_name) 的另一种方法:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def to_uper(string):
return string.upper()
to_upper_udf = udf(to_upper,StringType())
records = raw_records.withColumn("new_column_name"
,to_upper_udf(raw_records['column_name']))\
.drop("column_name")