带有 spark.createDataFrame 和 pyspark.ml.clustering 的 Pyspark Py4j IllegalArgumentException

Pyspark Py4j IllegalArgumentException with spark.createDataFrame and pyspark.ml.clustering

让我先公开我的问题的完整背景,我将有一个简化的 MWE,在底部重新创建相同的问题。请随意跳过我对设置的漫无边际,直接进入最后一部分。

我原来问题中的演员:

  1. 从 Amazon S3 读取的 spark 数据帧 data,其中一列 scaled_features 最终是 VectorAssembler 操作后跟 MinMaxScaler 的结果。
  2. 一个 spark 数据帧列 pca_features 是在 PCA 之后从上面的 df 列产生的,如下所示:
mat = RowMatrix(data.select('scaled_features').rdd.map(list))
pc = mat.computePrincipalComponents(2)
projected = mat.multiply(pc).rows.map(lambda x: (x, )).toDF().withColumnRenamed('_1', 'pca_features')
  1. BisectingKMeans 的两个实例适合上述数据框中的两个特征实例,如下所示:
kmeans_scaled = BisectingKMeans(featuresCol='scaled_features').setK(4).setSeed(1)
model1 = kmeans_scaled.fit(data)

kmeans_pca = BisectingKMeans(featuresCol='pca_features').setK(4).setSeed(1)
model2 = kmeans_pca.fit(projected)

问题:

虽然从我的第一个 df 中 BisectingKMeans 适合 scaled_features 没有问题,但在尝试适合投影特征时,它会出现以下错误

Py4JJavaError: An error occurred while calling o1413.fit.
: java.lang.IllegalArgumentException: requirement failed: Column features must be of type equal to one of the following types:
[struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>]
but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.

如您所见,Py4J 抱怨我正在传递特定结构类型的数据,而该结构类型恰好是允许类型列表中指定的第一个类型。

附加调试信息:

我的 Spark 是 运行 版本 2.4.0

检查数据类型产生:data.dtypes: [('scaled_features', 'vector')]projected.dtypes: [('pca_features', 'vector')]。两个数据帧的模式也相同,仅打印一个以供参考:

root
 |-- scaled_features: vector (nullable = true)

重现错误 (MWE):

事实证明,可以通过从一些向量创建一个简单的数据框来重现同样的错误(我原来的 dfs 中的列也是 VectorType):

from pyspark.sql import Row
from pyspark.mllib.linalg import DenseVector
from pyspark.ml.clustering import BisectingKMeans

test_data = spark.createDataFrame([Row(test_features=DenseVector([43.0, 0.0, 200.0, 1.0, 1.0, 1.0, 0.0, 3.0])),
    Row(test_features=DenseVector([44.0, 0.0, 250.0, 1.0, 1.0, 1.0, 0.0, 1.0])),
    Row(test_features=DenseVector([23.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0])),
    Row(test_features=DenseVector([25.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 2.0])),
    Row(test_features=DenseVector([19.0, 0.0, 200.0, 1.0, 0.0, 1.0, 0.0, 1.0]))])

kmeans_test = BisectingKMeans(featuresCol='test_features').setK(4).setSeed(1)
model3 = kmeans_test.fit(test_data)

最后一行导致了我在原始设置中遇到的相同错误。

谁能解释这个错误并提出纠正方法?

经过几天的调查,我被指出了问题的(相当尴尬的)原因:

Pyspark 有两个机器学习库:pyspark.mlpyspark.mllib,事实证明它们不能很好地结合在一起。将 from pyspark.mllib.linalg import DenseVector 替换为 from pyspark.ml.linalg import DenseVector 可解决所有问题。