PySpark 中的 KMeans 聚类
KMeans clustering in PySpark
我有一个包含许多列的 spark 数据框 'mydataframe'。我试图 运行 仅在两列上进行 kmeans:纬度和经度(纬度和经度),将它们用作简单值)。我想仅基于这 2 列提取 7 个簇,然后我想将簇分配附加到我的原始数据框。我试过:
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd # needs to be an RDD
data_rdd.cache()
# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")
但过了一会儿我收到错误消息:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5191.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5191.0 (TID 260738, 10.19.211.69, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last)
我已尝试分离并重新连接集群。同样的结果。我做错了什么?
因为,基于 , I guess you are in your very first steps with Spark clustering (you are even importing sqrt
& array
, without ever using them, probably because it is like that in the docs example),让我提供更笼统的建议,而不是针对您在这里提出的具体问题(希望也能避免您随后再打开 3-4 个问题,尝试将您的集群分配返回到您的数据框中)...
自从
你的数据已经在数据框中了
您想将集群成员资格附加回您的初始
数据框
你没有理由恢复到 RDD 并使用 (soon to be deprecated) MLlib 包;使用(现在推荐的)ML 包,您将更轻松、优雅和高效地完成您的工作,它直接与数据帧一起工作。
第 0 步 - 制作一些类似于您的玩具数据:
spark.version
# u'2.2.0'
df = spark.createDataFrame([[0, 33.3, -17.5],
[1, 40.4, -20.5],
[2, 28., -23.9],
[3, 29.5, -19.0],
[4, 32.8, -18.84]
],
["other","lat", "long"])
df.show()
# +-----+----+------+
# |other| lat| long|
# +-----+----+------+
# | 0|33.3| -17.5|
# | 1|40.4| -20.5|
# | 2|28.0| -23.9|
# | 3|29.5| -19.0|
# | 4|32.8|-18.84|
# +-----+----+------+
步骤 1 - assemble 您的特征
与大多数 ML 包相比,Spark ML 要求将您的输入特征收集到数据框的 单列 中,通常命名为 features
;它提供了一个特定的方法来做到这一点,VectorAssembler
:
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()
# +-----+----+------+-------------+
# |other| lat| long| features|
# +-----+----+------+-------------+
# | 0|33.3| -17.5| [33.3,-17.5]|
# | 1|40.4| -20.5| [40.4,-20.5]|
# | 2|28.0| -23.9| [28.0,-23.9]|
# | 3|29.5| -19.0| [29.5,-19.0]|
# | 4|32.8|-18.84|[32.8,-18.84]|
# +-----+----+------+-------------+
正如可能已经猜到的那样,参数 inputCols
用于告诉 VectoeAssembler
我们数据框中的哪些特定列将用作特征。
步骤 2 - 拟合您的 KMeans 模型
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=2, seed=1) # 2 clusters here
model = kmeans.fit(new_df.select('features'))
select('features')
在这里用于告诉算法将数据帧的哪一列用于聚类 - 请记住,在上面的第 1 步之后,您的原始 lat
和 long
特征没有更直接使用。
步骤 3 - 转换您的初始数据框以包含聚类分配
transformed = model.transform(new_df)
transformed.show()
# +-----+----+------+-------------+----------+
# |other| lat| long| features|prediction|
# +-----+----+------+-------------+----------+
# | 0|33.3| -17.5| [33.3,-17.5]| 0|
# | 1|40.4| -20.5| [40.4,-20.5]| 1|
# | 2|28.0| -23.9| [28.0,-23.9]| 0|
# | 3|29.5| -19.0| [29.5,-19.0]| 0|
# | 4|32.8|-18.84|[32.8,-18.84]| 0|
# +-----+----+------+-------------+----------+
transformed
数据框的最后一列 prediction
显示了聚类分配 - 在我的玩具箱中,我在聚类 #0 中有 4 条记录,在聚类 # 中有 1 条记录。 1.
您可以使用 select
语句进一步操作 transformed
数据框,甚至 drop
features
列(现在已经完成其功能,可能不再必要)...
希望你现在更接近你最初真正想要实现的目标。对于提取集群统计信息等, 可能会有所帮助...
尽管我给出了其他一般性回答,但如果您出于某种原因必须坚持使用 MLlib 和 RDD,以下是导致您使用相同玩具时出错的原因 df
。
当您将数据框中的 select
列转换为 RDD 时,结果是 行 :
的 RDD
df.select('lat', 'long').rdd.collect()
# [Row(lat=33.3, long=-17.5), Row(lat=40.4, long=-20.5), Row(lat=28.0, long=-23.9), Row(lat=29.5, long=-19.0), Row(lat=32.8, long=-18.84)]
不适合作为 MLlib KMeans 的输入。您需要 map
操作才能正常工作:
df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1])).collect()
# [(33.3, -17.5), (40.4, -20.5), (28.0, -23.9), (29.5, -19.0), (32.8, -18.84)]
所以,你的代码应该是这样的:
from pyspark.mllib.clustering import KMeans, KMeansModel
rdd = df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1]))
clusters = KMeans.train(rdd, 2, maxIterations=10, initializationMode="random") # works OK
clusters.centers
# [array([ 40.4, -20.5]), array([ 30.9 , -19.81])]
我有一个包含许多列的 spark 数据框 'mydataframe'。我试图 运行 仅在两列上进行 kmeans:纬度和经度(纬度和经度),将它们用作简单值)。我想仅基于这 2 列提取 7 个簇,然后我想将簇分配附加到我的原始数据框。我试过:
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd # needs to be an RDD
data_rdd.cache()
# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")
但过了一会儿我收到错误消息:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5191.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5191.0 (TID 260738, 10.19.211.69, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last)
我已尝试分离并重新连接集群。同样的结果。我做错了什么?
因为,基于 sqrt
& array
, without ever using them, probably because it is like that in the docs example),让我提供更笼统的建议,而不是针对您在这里提出的具体问题(希望也能避免您随后再打开 3-4 个问题,尝试将您的集群分配返回到您的数据框中)...
自从
你的数据已经在数据框中了
您想将集群成员资格附加回您的初始 数据框
你没有理由恢复到 RDD 并使用 (soon to be deprecated) MLlib 包;使用(现在推荐的)ML 包,您将更轻松、优雅和高效地完成您的工作,它直接与数据帧一起工作。
第 0 步 - 制作一些类似于您的玩具数据:
spark.version
# u'2.2.0'
df = spark.createDataFrame([[0, 33.3, -17.5],
[1, 40.4, -20.5],
[2, 28., -23.9],
[3, 29.5, -19.0],
[4, 32.8, -18.84]
],
["other","lat", "long"])
df.show()
# +-----+----+------+
# |other| lat| long|
# +-----+----+------+
# | 0|33.3| -17.5|
# | 1|40.4| -20.5|
# | 2|28.0| -23.9|
# | 3|29.5| -19.0|
# | 4|32.8|-18.84|
# +-----+----+------+
步骤 1 - assemble 您的特征
与大多数 ML 包相比,Spark ML 要求将您的输入特征收集到数据框的 单列 中,通常命名为 features
;它提供了一个特定的方法来做到这一点,VectorAssembler
:
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()
# +-----+----+------+-------------+
# |other| lat| long| features|
# +-----+----+------+-------------+
# | 0|33.3| -17.5| [33.3,-17.5]|
# | 1|40.4| -20.5| [40.4,-20.5]|
# | 2|28.0| -23.9| [28.0,-23.9]|
# | 3|29.5| -19.0| [29.5,-19.0]|
# | 4|32.8|-18.84|[32.8,-18.84]|
# +-----+----+------+-------------+
正如可能已经猜到的那样,参数 inputCols
用于告诉 VectoeAssembler
我们数据框中的哪些特定列将用作特征。
步骤 2 - 拟合您的 KMeans 模型
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=2, seed=1) # 2 clusters here
model = kmeans.fit(new_df.select('features'))
select('features')
在这里用于告诉算法将数据帧的哪一列用于聚类 - 请记住,在上面的第 1 步之后,您的原始 lat
和 long
特征没有更直接使用。
步骤 3 - 转换您的初始数据框以包含聚类分配
transformed = model.transform(new_df)
transformed.show()
# +-----+----+------+-------------+----------+
# |other| lat| long| features|prediction|
# +-----+----+------+-------------+----------+
# | 0|33.3| -17.5| [33.3,-17.5]| 0|
# | 1|40.4| -20.5| [40.4,-20.5]| 1|
# | 2|28.0| -23.9| [28.0,-23.9]| 0|
# | 3|29.5| -19.0| [29.5,-19.0]| 0|
# | 4|32.8|-18.84|[32.8,-18.84]| 0|
# +-----+----+------+-------------+----------+
transformed
数据框的最后一列 prediction
显示了聚类分配 - 在我的玩具箱中,我在聚类 #0 中有 4 条记录,在聚类 # 中有 1 条记录。 1.
您可以使用 select
语句进一步操作 transformed
数据框,甚至 drop
features
列(现在已经完成其功能,可能不再必要)...
希望你现在更接近你最初真正想要实现的目标。对于提取集群统计信息等,
尽管我给出了其他一般性回答,但如果您出于某种原因必须坚持使用 MLlib 和 RDD,以下是导致您使用相同玩具时出错的原因 df
。
当您将数据框中的 select
列转换为 RDD 时,结果是 行 :
df.select('lat', 'long').rdd.collect()
# [Row(lat=33.3, long=-17.5), Row(lat=40.4, long=-20.5), Row(lat=28.0, long=-23.9), Row(lat=29.5, long=-19.0), Row(lat=32.8, long=-18.84)]
不适合作为 MLlib KMeans 的输入。您需要 map
操作才能正常工作:
df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1])).collect()
# [(33.3, -17.5), (40.4, -20.5), (28.0, -23.9), (29.5, -19.0), (32.8, -18.84)]
所以,你的代码应该是这样的:
from pyspark.mllib.clustering import KMeans, KMeansModel
rdd = df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1]))
clusters = KMeans.train(rdd, 2, maxIterations=10, initializationMode="random") # works OK
clusters.centers
# [array([ 40.4, -20.5]), array([ 30.9 , -19.81])]