PySpark 平面图应该 return 具有类型化值的元组
PySpark flatmap should return tuples with typed values
我将 Jupyter Notebook 与 PySpark 结合使用。在其中我有一个数据框,它有一个模式,其中包含这些列的列名和类型(整数,...)。现在我使用像 flatMap 这样的方法,但是这个 returns 是一个不再有固定类型的元组列表。有办法实现吗?
df.printSchema()
root
|-- name: string (nullable = true)
|-- ...
|-- ...
|-- ratings: integer (nullable = true)
然后我使用 flatMap 对评级值进行一些计算(此处混淆):
df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings))
y_rate.toDF().printSchema()
现在我得到一个错误:
TypeError: Can not infer schema for type:
有什么方法可以通过保留架构来使用 map/flatMap/reduce 吗?或者至少返回具有特定类型值的元组?
首先你用错了函数。 flatMap
将 map
和 flatten
因此假设您的数据如下所示:
df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"])
flatMap
的输出将等同于:
sc.parallelize(['foo', 0, 'bar', 5])
因此出现您看到的错误。如果你真的想让它工作,你应该使用 map
:
df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF()
## DataFrame[_1: string, _2: bigint]
接下来,2.0 不再支持 DataFrame
上的映射。您应该先提取 rdd
(请参阅上面的 df.rdd.map
)。
最终在Python 和JVM 之间传递数据效率极低。它不仅需要在 Python 和 JVM 之间传递数据并进行相应的序列化/反序列化和模式推断(如果未明确提供模式),这也打破了惰性。对于这样的事情,最好使用 SQL 表达式:
from pyspark.sql.functions import when
df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings))
如果出于某种原因您需要纯 Python 代码,UDF 可能是更好的选择。
我将 Jupyter Notebook 与 PySpark 结合使用。在其中我有一个数据框,它有一个模式,其中包含这些列的列名和类型(整数,...)。现在我使用像 flatMap 这样的方法,但是这个 returns 是一个不再有固定类型的元组列表。有办法实现吗?
df.printSchema()
root
|-- name: string (nullable = true)
|-- ...
|-- ...
|-- ratings: integer (nullable = true)
然后我使用 flatMap 对评级值进行一些计算(此处混淆):
df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings))
y_rate.toDF().printSchema()
现在我得到一个错误:
TypeError: Can not infer schema for type:
有什么方法可以通过保留架构来使用 map/flatMap/reduce 吗?或者至少返回具有特定类型值的元组?
首先你用错了函数。 flatMap
将 map
和 flatten
因此假设您的数据如下所示:
df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"])
flatMap
的输出将等同于:
sc.parallelize(['foo', 0, 'bar', 5])
因此出现您看到的错误。如果你真的想让它工作,你应该使用 map
:
df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF()
## DataFrame[_1: string, _2: bigint]
接下来,2.0 不再支持 DataFrame
上的映射。您应该先提取 rdd
(请参阅上面的 df.rdd.map
)。
最终在Python 和JVM 之间传递数据效率极低。它不仅需要在 Python 和 JVM 之间传递数据并进行相应的序列化/反序列化和模式推断(如果未明确提供模式),这也打破了惰性。对于这样的事情,最好使用 SQL 表达式:
from pyspark.sql.functions import when
df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings))
如果出于某种原因您需要纯 Python 代码,UDF 可能是更好的选择。