PySpark 平面图应该 return 具有类型化值的元组

PySpark flatmap should return tuples with typed values

我将 Jupyter Notebook 与 PySpark 结合使用。在其中我有一个数据框,它有一个模式,其中包含这些列的列名和类型(整数,...)。现在我使用像 flatMap 这样的方法,但是这个 returns 是一个不再有固定类型的元组列表。有办法实现吗?

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- ...
 |-- ...
 |-- ratings: integer (nullable = true)

然后我使用 flatMap 对评级值进行一些计算(此处混淆):

df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings))
y_rate.toDF().printSchema()

现在我得到一个错误:

TypeError: Can not infer schema for type:

有什么方法可以通过保留架构来使用 map/flatMap/reduce 吗?或者至少返回具有特定类型值的元组?

首先你用错了函数。 flatMapmapflatten 因此假设您的数据如下所示:

df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"])

flatMap 的输出将等同于:

sc.parallelize(['foo', 0, 'bar', 5])

因此出现您看到的错误。如果你真的想让它工作,你应该使用 map:

df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF()
## DataFrame[_1: string, _2: bigint]

接下来,2.0 不再支持 DataFrame 上的映射。您应该先提取 rdd(请参阅上面的 df.rdd.map)。

最终在Python 和JVM 之间传递数据效率极低。它不仅需要在 Python 和 JVM 之间传递数据并进行相应的序列化/反序列化和模式推断(如果未明确提供模式),这也打破了惰性。对于这样的事情,最好使用 SQL 表达式:

from pyspark.sql.functions import when

df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings))

如果出于某种原因您需要纯 Python 代码,UDF 可能是更好的选择。