带有 spark.createDataFrame 和 pyspark.ml.clustering 的 Pyspark Py4j IllegalArgumentException
Pyspark Py4j IllegalArgumentException with spark.createDataFrame and pyspark.ml.clustering
让我先公开我的问题的完整背景,我将有一个简化的 MWE,在底部重新创建相同的问题。请随意跳过我对设置的漫无边际,直接进入最后一部分。
我原来问题中的演员:
- 从 Amazon S3 读取的 spark 数据帧
data
,其中一列 scaled_features
最终是 VectorAssembler
操作后跟 MinMaxScaler
的结果。
- 一个 spark 数据帧列
pca_features
是在 PCA 之后从上面的 df 列产生的,如下所示:
mat = RowMatrix(data.select('scaled_features').rdd.map(list))
pc = mat.computePrincipalComponents(2)
projected = mat.multiply(pc).rows.map(lambda x: (x, )).toDF().withColumnRenamed('_1', 'pca_features')
BisectingKMeans
的两个实例适合上述数据框中的两个特征实例,如下所示:
kmeans_scaled = BisectingKMeans(featuresCol='scaled_features').setK(4).setSeed(1)
model1 = kmeans_scaled.fit(data)
kmeans_pca = BisectingKMeans(featuresCol='pca_features').setK(4).setSeed(1)
model2 = kmeans_pca.fit(projected)
问题:
虽然从我的第一个 df 中 BisectingKMeans
适合 scaled_features
没有问题,但在尝试适合投影特征时,它会出现以下错误
Py4JJavaError: An error occurred while calling o1413.fit.
: java.lang.IllegalArgumentException: requirement failed: Column features must be of type equal to one of the following types:
[struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>]
but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
如您所见,Py4J 抱怨我正在传递特定结构类型的数据,而该结构类型恰好是允许类型列表中指定的第一个类型。
附加调试信息:
我的 Spark 是 运行 版本 2.4.0
检查数据类型产生:data.dtypes: [('scaled_features', 'vector')]
和 projected.dtypes: [('pca_features', 'vector')]
。两个数据帧的模式也相同,仅打印一个以供参考:
root
|-- scaled_features: vector (nullable = true)
重现错误 (MWE):
事实证明,可以通过从一些向量创建一个简单的数据框来重现同样的错误(我原来的 dfs 中的列也是 VectorType):
from pyspark.sql import Row
from pyspark.mllib.linalg import DenseVector
from pyspark.ml.clustering import BisectingKMeans
test_data = spark.createDataFrame([Row(test_features=DenseVector([43.0, 0.0, 200.0, 1.0, 1.0, 1.0, 0.0, 3.0])),
Row(test_features=DenseVector([44.0, 0.0, 250.0, 1.0, 1.0, 1.0, 0.0, 1.0])),
Row(test_features=DenseVector([23.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0])),
Row(test_features=DenseVector([25.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 2.0])),
Row(test_features=DenseVector([19.0, 0.0, 200.0, 1.0, 0.0, 1.0, 0.0, 1.0]))])
kmeans_test = BisectingKMeans(featuresCol='test_features').setK(4).setSeed(1)
model3 = kmeans_test.fit(test_data)
最后一行导致了我在原始设置中遇到的相同错误。
谁能解释这个错误并提出纠正方法?
经过几天的调查,我被指出了问题的(相当尴尬的)原因:
Pyspark 有两个机器学习库:pyspark.ml
和 pyspark.mllib
,事实证明它们不能很好地结合在一起。将 from pyspark.mllib.linalg import DenseVector
替换为 from pyspark.ml.linalg import DenseVector
可解决所有问题。
让我先公开我的问题的完整背景,我将有一个简化的 MWE,在底部重新创建相同的问题。请随意跳过我对设置的漫无边际,直接进入最后一部分。
我原来问题中的演员:
- 从 Amazon S3 读取的 spark 数据帧
data
,其中一列scaled_features
最终是VectorAssembler
操作后跟MinMaxScaler
的结果。 - 一个 spark 数据帧列
pca_features
是在 PCA 之后从上面的 df 列产生的,如下所示:
mat = RowMatrix(data.select('scaled_features').rdd.map(list))
pc = mat.computePrincipalComponents(2)
projected = mat.multiply(pc).rows.map(lambda x: (x, )).toDF().withColumnRenamed('_1', 'pca_features')
BisectingKMeans
的两个实例适合上述数据框中的两个特征实例,如下所示:
kmeans_scaled = BisectingKMeans(featuresCol='scaled_features').setK(4).setSeed(1)
model1 = kmeans_scaled.fit(data)
kmeans_pca = BisectingKMeans(featuresCol='pca_features').setK(4).setSeed(1)
model2 = kmeans_pca.fit(projected)
问题:
虽然从我的第一个 df 中 BisectingKMeans
适合 scaled_features
没有问题,但在尝试适合投影特征时,它会出现以下错误
Py4JJavaError: An error occurred while calling o1413.fit.
: java.lang.IllegalArgumentException: requirement failed: Column features must be of type equal to one of the following types:
[struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>]
but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
如您所见,Py4J 抱怨我正在传递特定结构类型的数据,而该结构类型恰好是允许类型列表中指定的第一个类型。
附加调试信息:
我的 Spark 是 运行 版本 2.4.0
检查数据类型产生:data.dtypes: [('scaled_features', 'vector')]
和 projected.dtypes: [('pca_features', 'vector')]
。两个数据帧的模式也相同,仅打印一个以供参考:
root
|-- scaled_features: vector (nullable = true)
重现错误 (MWE):
事实证明,可以通过从一些向量创建一个简单的数据框来重现同样的错误(我原来的 dfs 中的列也是 VectorType):
from pyspark.sql import Row
from pyspark.mllib.linalg import DenseVector
from pyspark.ml.clustering import BisectingKMeans
test_data = spark.createDataFrame([Row(test_features=DenseVector([43.0, 0.0, 200.0, 1.0, 1.0, 1.0, 0.0, 3.0])),
Row(test_features=DenseVector([44.0, 0.0, 250.0, 1.0, 1.0, 1.0, 0.0, 1.0])),
Row(test_features=DenseVector([23.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0])),
Row(test_features=DenseVector([25.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 2.0])),
Row(test_features=DenseVector([19.0, 0.0, 200.0, 1.0, 0.0, 1.0, 0.0, 1.0]))])
kmeans_test = BisectingKMeans(featuresCol='test_features').setK(4).setSeed(1)
model3 = kmeans_test.fit(test_data)
最后一行导致了我在原始设置中遇到的相同错误。
谁能解释这个错误并提出纠正方法?
经过几天的调查,我被指出了问题的(相当尴尬的)原因:
Pyspark 有两个机器学习库:pyspark.ml
和 pyspark.mllib
,事实证明它们不能很好地结合在一起。将 from pyspark.mllib.linalg import DenseVector
替换为 from pyspark.ml.linalg import DenseVector
可解决所有问题。