如何使用 PySpark 转换结构化流?

How to transform structured streams with PySpark?

这看起来应该是显而易见的,但在查看文档和示例时,我不确定我能否找到一种方法来获取结构化流并使用 PySpark 进行转换。

例如:

from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName('StreamingWordCount')
    .getOrCreate()
)

raw_records = (
    spark
    .readStream
    .format('socket')
    .option('host', 'localhost')
    .option('port', 9999)
    .load()
)

# I realize there's a SQL function for upper-case, just illustrating a sample
# use of an arbitrary map function
records = raw_records.rdd.map(lambda w: w.upper()).toDF()

counts = (
    records
    .groupBy(records.value)
    .count()
)

query = (
    counts
    .writeStream
    .outputMode('complete')
    .format('console')
    .start()
)
query.awaitTermination()

这将抛出以下异常:

Queries with streaming sources must be executed with writeStream.start

但是,如果我删除对 rdd.map(...).toDF() 的调用,一切似乎都正常。

似乎对 rdd.map 的调用从流上下文分支执行并导致 Spark 警告它从未启动?

是否有 "right" 方法使用结构化流和 PySpark 应用 mapmapPartition 样式转换?

Structured Streaming 中应用的每个转换都必须完全包含在 Dataset 世界中 - 对于 PySpark,这意味着您只能使用 DataFrame 或 SQL 并转换为不支持 RDD(或 DStream 或本地集合)。

如果您想使用纯 Python 代码,您必须使用 UserDefinedFunction

from pyspark.sql.functions import udf

@udf
def to_upper(s)
    return s.upper()

raw_records.select(to_upper("value"))

另见

特定列 (column_name) 的另一种方法:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def to_uper(string):
    return string.upper()

to_upper_udf = udf(to_upper,StringType())

records = raw_records.withColumn("new_column_name"
                      ,to_upper_udf(raw_records['column_name']))\
                      .drop("column_name")