PySpark ML:获取 KMeans 集群统计信息
PySpark ML: Get KMeans cluster statistics
我已经构建了一个 KMeansModel。我的结果存储在一个名为
transformed
。
(a) 如何解释 transformed
的内容?
(b) 如何从 transformed
创建一个或多个 Pandas DataFrame 以显示 14 个集群中每个集群的 13 个特征中每个特征的汇总统计数据?
from pyspark.ml.clustering import KMeans
# Trains a k-means model.
kmeans = KMeans().setK(14).setSeed(1)
model = kmeans.fit(X_spark_scaled) # Fits a model to the input dataset with optional parameters.
transformed = model.transform(X_spark_scaled).select("features", "prediction") # X_spark_scaled is my PySpark DataFrame consisting of 13 features
transformed.show(5, truncate = False)
+------------------------------------------------------------------------------------------------------------------------------------+----------+
|features |prediction|
+------------------------------------------------------------------------------------------------------------------------------------+----------+
|(14,[4,5,7,8,9,13],[1.0,1.0,485014.0,0.25,2.0,1.0]) |12 |
|(14,[2,7,8,9,12,13],[1.0,2401233.0,1.0,1.0,1.0,1.0]) |2 |
|(14,[2,4,5,7,8,9,13],[0.3333333333333333,0.6666666666666666,0.6666666666666666,2429111.0,0.9166666666666666,1.3333333333333333,3.0])|2 |
|(14,[4,5,7,8,9,12,13],[1.0,1.0,2054748.0,0.15384615384615385,11.0,1.0,1.0]) |11 |
|(14,[2,7,8,9,13],[1.0,43921.0,1.0,1.0,1.0]) |1 |
+------------------------------------------------------------------------------------------------------------------------------------+----------+
only showing top 5 rows
顺便说一句,我从另一个 SO post 中发现我可以将这些功能映射到它们的名称,如下所示。在一个或多个 Pandas 数据帧中为每个集群的每个特征提供汇总统计数据(平均值、中位数、标准差、最小值、最大值)会很好。
attr_list = [attr for attr in chain(*transformed.schema['features'].metadata['ml_attr']['attrs'].values())]
attr_list
根据评论中的要求,这里有一个包含 2 条数据记录的快照(不想提供太多记录 -- 此处为专有信息)
+---------------------+------------------------+-----------------------+----------------------+----------------------+------------------------------+---------------------------------+------------+-------------------+--------------------+------------------------------------+--------------------------+-------------------------------+-----------------+--------------------+--------------------+
|device_type_robot_pct|device_type_smart_tv_pct|device_type_desktop_pct|device_type_tablet_pct|device_type_mobile_pct|device_type_mobile_persist_pct|visitors_seen_with_anonymiser_pct|ip_time_span| ip_weight|mean_ips_per_visitor|visitors_seen_with_multi_country_pct|international_visitors_pct|visitors_seen_with_multi_ua_pct|count_tuids_on_ip| features| scaledFeatures|
+---------------------+------------------------+-----------------------+----------------------+----------------------+------------------------------+---------------------------------+------------+-------------------+--------------------+------------------------------------+--------------------------+-------------------------------+-----------------+--------------------+--------------------+
| 0.0| 0.0| 0.0| 0.0| 1.0| 1.0| 0.0| 485014.0| 0.25| 2.0| 0.0| 0.0| 0.0| 1.0|(14,[4,5,7,8,9,13...|(14,[4,5,7,8,9,13...|
| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0| 0.0| 2401233.0| 1.0| 1.0| 0.0| 0.0| 1.0| 1.0|(14,[2,7,8,9,12,1...|(14,[2,7,8,9,12,1...|
正如 Anony-Mousse 评论的那样,(Py)Spark ML 确实比 scikit-learn 或其他类似包 更受限制,而且此类功能并非微不足道;尽管如此,这里有一种方法可以得到你想要的(集群统计):
spark.version
# u'2.2.0'
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
# toy data - 5-d features including sparse vectors
df = spark.createDataFrame(
[(Vectors.sparse(5,[(0, 164.0),(1,520.0)]), 1.0),
(Vectors.dense([519.0,2723.0,0.0,3.0,4.0]), 1.0),
(Vectors.sparse(5,[(0, 2868.0), (1, 928.0)]), 1.0),
(Vectors.sparse(5,[(0, 57.0), (1, 2715.0)]), 0.0),
(Vectors.dense([1241.0,2104.0,0.0,0.0,2.0]), 1.0)],
["features", "target"])
df.show()
# +--------------------+------+
# | features|target|
# +--------------------+------+
# |(5,[0,1],[164.0,5...| 1.0|
# |[519.0,2723.0,0.0...| 1.0|
# |(5,[0,1],[2868.0,...| 1.0|
# |(5,[0,1],[57.0,27...| 0.0|
# |[1241.0,2104.0,0....| 1.0|
# +--------------------+------+
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df.select('features'))
transformed = model.transform(df).select("features", "prediction")
transformed.show()
# +--------------------+----------+
# | features|prediction|
# +--------------------+----------+
# |(5,[0,1],[164.0,5...| 1|
# |[519.0,2723.0,0.0...| 2|
# |(5,[0,1],[2868.0,...| 0|
# |(5,[0,1],[57.0,27...| 2|
# |[1241.0,2104.0,0....| 2|
# +--------------------+----------+
到此为止,关于您的第一个问题:
How do I interpret the contents of transformed
?
features
列只是原始数据中同一列的复制。
prediction
列是相应数据记录所属的簇;在我的示例中,有 5 条数据记录和 k=3
个集群,我最终在集群 #0 中有 1 条记录,在集群 #1 中有 1 条记录,在集群 #2 中有 3 条记录。
关于你的第二个问题:
How do I create one or more Pandas DataFrame from transformed
that would show summary statistics for each of the 13 features for each of the 14 clusters?
(注意:您似乎有 14 个特征而不是 13...)
这是一个看似简单的任务的一个很好的例子,不幸的是,PySpark 没有提供现成的功能 - 尤其是因为所有功能都分组在一个 单个 向量中 features
;为此,我们必须首先 "disassemble" features
,有效地提出 VectorAssembler
.
的 反转 操作
我目前能想到的唯一方法是暂时恢复到 RDD 并执行 map
操作 [编辑:这不是真正必要的 - 请参阅下面的更新];这是上面我的集群 #2 的示例,其中包含密集和稀疏向量:
# keep only cluster #2:
cl_2 = transformed.filter(transformed.prediction==2)
cl_2.show()
# +--------------------+----------+
# | features|prediction|
# +--------------------+----------+
# |[519.0,2723.0,0.0...| 2|
# |(5,[0,1],[57.0,27...| 2|
# |[1241.0,2104.0,0....| 2|
# +--------------------+----------+
# set the data dimensionality as a parameter:
dimensionality = 5
cluster_2 = cl_2.drop('prediction').rdd.map(lambda x: [float(x[0][i]) for i in range(dimensionality)]).toDF(schema=['x'+str(i) for i in range(dimensionality)])
cluster_2.show()
# +------+------+---+---+---+
# | x0| x1| x2| x3| x4|
# +------+------+---+---+---+
# | 519.0|2723.0|0.0|3.0|4.0|
# | 57.0|2715.0|0.0|0.0|0.0|
# |1241.0|2104.0|0.0|0.0|2.0|
# +------+------+---+---+---+
(如果您的初始数据位于 Spark 数据帧 initial_data
中,您可以将最后一部分更改为 toDF(schema=initial_data.columns)
,以保留原始特征名称。)
从这一点开始,您可以将 cluster_2
数据帧转换为 pandas 数据帧(如果它适合您的记忆),或者使用 Spark 数据帧的 describe()
函数来获取您的汇总统计数据:
cluster_2.describe().show()
# result:
+-------+-----------------+-----------------+---+------------------+---+
|summary| x0| x1| x2| x3| x4|
+-------+-----------------+-----------------+---+------------------+---+
| count| 3| 3| 3| 3| 3|
| mean|605.6666666666666| 2514.0|0.0| 1.0|2.0|
| stddev|596.7389155512932|355.0929455790413|0.0|1.7320508075688772|2.0|
| min| 57.0| 2104.0|0.0| 0.0|0.0|
| max| 1241.0| 2723.0|0.0| 3.0|4.0|
+-------+-----------------+-----------------+---+------------------+---+
在您的情况下将上述代码与 dimensionality=14
一起使用应该可以完成工作...
对 mean
和 stddev
中所有这些(可以说是无用的)有效数字感到恼火?作为奖励,这是我想出的一个小效用函数 some time ago 作为一个漂亮的总结:
def prettySummary(df):
""" Neat summary statistics of a Spark dataframe
Args:
pyspark.sql.dataframe.DataFrame (df): input dataframe
Returns:
pandas.core.frame.DataFrame: a pandas dataframe with the summary statistics of df
"""
import pandas as pd
temp = df.describe().toPandas()
temp.iloc[1:3,1:] = temp.iloc[1:3,1:].convert_objects(convert_numeric=True)
pd.options.display.float_format = '{:,.2f}'.format
return temp
stats_df = prettySummary(cluster_2)
stats_df
# result:
summary x0 x1 x2 x3 x4
0 count 3 3 3 3 3
1 mean 605.67 2,514.00 0.00 1.00 2.00
2 stddev 596.74 355.09 0.00 1.73 2.00
3 min 57.0 2104.0 0.0 0.0 0.0
4 max 1241.0 2723.0 0.0 3.0 4.0
UPDATE: 再次思考,看到你的示例数据,我想出了一个更直接的解决方案,不需要调用中间 RDD(一个操作如果可能的话,可以说更愿意避免)...
关键观察是 transformed
的完整内容,即 没有 select
语句;保持与上面相同的玩具数据集,我们得到:
transformed = model.transform(df) # no 'select' statements
transformed.show()
# +--------------------+------+----------+
# | features|target|prediction|
# +--------------------+------+----------+
# |(5,[0,1],[164.0,5...| 1.0| 1|
# |[519.0,2723.0,0.0...| 1.0| 2|
# |(5,[0,1],[2868.0,...| 1.0| 0|
# |(5,[0,1],[57.0,27...| 0.0| 2|
# |[1241.0,2104.0,0....| 1.0| 2|
# +--------------------+------+----------+
如您所见,无论数据帧 df
中是否存在其他要转换的列(在我的例子中只有一列 - target
),只需 "pass-through" 转换过程并结束-up 出现在最终结果中...
希望您开始理解:如果 df
包含您最初的 14 个特征,每个特征在一个单独的列中,加上名为 features
的第 15 列(大致如示例数据所示,但没有最后一列),然后是以下代码:
kmeans = KMeans().setK(14)
model = kmeans.fit(df.select('features'))
transformed = model.transform(df).drop('features')
将为您留下一个包含 15 列的 Spark 数据框 transformed
,即您最初的 14 个特征加上具有相应簇号的 prediction
列。
从这一点开始,您可以按照我上面显示的方式继续 transformed
中的 filter
个特定集群并获得您的摘要统计信息,但您将避免(昂贵的...)转换为中间临时 RDD,从而使您的所有操作保持在更高效的 Spark 数据帧上下文中...
我已经构建了一个 KMeansModel。我的结果存储在一个名为
transformed
。
(a) 如何解释 transformed
的内容?
(b) 如何从 transformed
创建一个或多个 Pandas DataFrame 以显示 14 个集群中每个集群的 13 个特征中每个特征的汇总统计数据?
from pyspark.ml.clustering import KMeans
# Trains a k-means model.
kmeans = KMeans().setK(14).setSeed(1)
model = kmeans.fit(X_spark_scaled) # Fits a model to the input dataset with optional parameters.
transformed = model.transform(X_spark_scaled).select("features", "prediction") # X_spark_scaled is my PySpark DataFrame consisting of 13 features
transformed.show(5, truncate = False)
+------------------------------------------------------------------------------------------------------------------------------------+----------+
|features |prediction|
+------------------------------------------------------------------------------------------------------------------------------------+----------+
|(14,[4,5,7,8,9,13],[1.0,1.0,485014.0,0.25,2.0,1.0]) |12 |
|(14,[2,7,8,9,12,13],[1.0,2401233.0,1.0,1.0,1.0,1.0]) |2 |
|(14,[2,4,5,7,8,9,13],[0.3333333333333333,0.6666666666666666,0.6666666666666666,2429111.0,0.9166666666666666,1.3333333333333333,3.0])|2 |
|(14,[4,5,7,8,9,12,13],[1.0,1.0,2054748.0,0.15384615384615385,11.0,1.0,1.0]) |11 |
|(14,[2,7,8,9,13],[1.0,43921.0,1.0,1.0,1.0]) |1 |
+------------------------------------------------------------------------------------------------------------------------------------+----------+
only showing top 5 rows
顺便说一句,我从另一个 SO post 中发现我可以将这些功能映射到它们的名称,如下所示。在一个或多个 Pandas 数据帧中为每个集群的每个特征提供汇总统计数据(平均值、中位数、标准差、最小值、最大值)会很好。
attr_list = [attr for attr in chain(*transformed.schema['features'].metadata['ml_attr']['attrs'].values())]
attr_list
根据评论中的要求,这里有一个包含 2 条数据记录的快照(不想提供太多记录 -- 此处为专有信息)
+---------------------+------------------------+-----------------------+----------------------+----------------------+------------------------------+---------------------------------+------------+-------------------+--------------------+------------------------------------+--------------------------+-------------------------------+-----------------+--------------------+--------------------+
|device_type_robot_pct|device_type_smart_tv_pct|device_type_desktop_pct|device_type_tablet_pct|device_type_mobile_pct|device_type_mobile_persist_pct|visitors_seen_with_anonymiser_pct|ip_time_span| ip_weight|mean_ips_per_visitor|visitors_seen_with_multi_country_pct|international_visitors_pct|visitors_seen_with_multi_ua_pct|count_tuids_on_ip| features| scaledFeatures|
+---------------------+------------------------+-----------------------+----------------------+----------------------+------------------------------+---------------------------------+------------+-------------------+--------------------+------------------------------------+--------------------------+-------------------------------+-----------------+--------------------+--------------------+
| 0.0| 0.0| 0.0| 0.0| 1.0| 1.0| 0.0| 485014.0| 0.25| 2.0| 0.0| 0.0| 0.0| 1.0|(14,[4,5,7,8,9,13...|(14,[4,5,7,8,9,13...|
| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0| 0.0| 2401233.0| 1.0| 1.0| 0.0| 0.0| 1.0| 1.0|(14,[2,7,8,9,12,1...|(14,[2,7,8,9,12,1...|
正如 Anony-Mousse 评论的那样,(Py)Spark ML 确实比 scikit-learn 或其他类似包 更受限制,而且此类功能并非微不足道;尽管如此,这里有一种方法可以得到你想要的(集群统计):
spark.version
# u'2.2.0'
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
# toy data - 5-d features including sparse vectors
df = spark.createDataFrame(
[(Vectors.sparse(5,[(0, 164.0),(1,520.0)]), 1.0),
(Vectors.dense([519.0,2723.0,0.0,3.0,4.0]), 1.0),
(Vectors.sparse(5,[(0, 2868.0), (1, 928.0)]), 1.0),
(Vectors.sparse(5,[(0, 57.0), (1, 2715.0)]), 0.0),
(Vectors.dense([1241.0,2104.0,0.0,0.0,2.0]), 1.0)],
["features", "target"])
df.show()
# +--------------------+------+
# | features|target|
# +--------------------+------+
# |(5,[0,1],[164.0,5...| 1.0|
# |[519.0,2723.0,0.0...| 1.0|
# |(5,[0,1],[2868.0,...| 1.0|
# |(5,[0,1],[57.0,27...| 0.0|
# |[1241.0,2104.0,0....| 1.0|
# +--------------------+------+
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df.select('features'))
transformed = model.transform(df).select("features", "prediction")
transformed.show()
# +--------------------+----------+
# | features|prediction|
# +--------------------+----------+
# |(5,[0,1],[164.0,5...| 1|
# |[519.0,2723.0,0.0...| 2|
# |(5,[0,1],[2868.0,...| 0|
# |(5,[0,1],[57.0,27...| 2|
# |[1241.0,2104.0,0....| 2|
# +--------------------+----------+
到此为止,关于您的第一个问题:
How do I interpret the contents of
transformed
?
features
列只是原始数据中同一列的复制。
prediction
列是相应数据记录所属的簇;在我的示例中,有 5 条数据记录和 k=3
个集群,我最终在集群 #0 中有 1 条记录,在集群 #1 中有 1 条记录,在集群 #2 中有 3 条记录。
关于你的第二个问题:
How do I create one or more Pandas DataFrame from
transformed
that would show summary statistics for each of the 13 features for each of the 14 clusters?
(注意:您似乎有 14 个特征而不是 13...)
这是一个看似简单的任务的一个很好的例子,不幸的是,PySpark 没有提供现成的功能 - 尤其是因为所有功能都分组在一个 单个 向量中 features
;为此,我们必须首先 "disassemble" features
,有效地提出 VectorAssembler
.
我目前能想到的唯一方法是暂时恢复到 RDD 并执行 map
操作 [编辑:这不是真正必要的 - 请参阅下面的更新];这是上面我的集群 #2 的示例,其中包含密集和稀疏向量:
# keep only cluster #2:
cl_2 = transformed.filter(transformed.prediction==2)
cl_2.show()
# +--------------------+----------+
# | features|prediction|
# +--------------------+----------+
# |[519.0,2723.0,0.0...| 2|
# |(5,[0,1],[57.0,27...| 2|
# |[1241.0,2104.0,0....| 2|
# +--------------------+----------+
# set the data dimensionality as a parameter:
dimensionality = 5
cluster_2 = cl_2.drop('prediction').rdd.map(lambda x: [float(x[0][i]) for i in range(dimensionality)]).toDF(schema=['x'+str(i) for i in range(dimensionality)])
cluster_2.show()
# +------+------+---+---+---+
# | x0| x1| x2| x3| x4|
# +------+------+---+---+---+
# | 519.0|2723.0|0.0|3.0|4.0|
# | 57.0|2715.0|0.0|0.0|0.0|
# |1241.0|2104.0|0.0|0.0|2.0|
# +------+------+---+---+---+
(如果您的初始数据位于 Spark 数据帧 initial_data
中,您可以将最后一部分更改为 toDF(schema=initial_data.columns)
,以保留原始特征名称。)
从这一点开始,您可以将 cluster_2
数据帧转换为 pandas 数据帧(如果它适合您的记忆),或者使用 Spark 数据帧的 describe()
函数来获取您的汇总统计数据:
cluster_2.describe().show()
# result:
+-------+-----------------+-----------------+---+------------------+---+
|summary| x0| x1| x2| x3| x4|
+-------+-----------------+-----------------+---+------------------+---+
| count| 3| 3| 3| 3| 3|
| mean|605.6666666666666| 2514.0|0.0| 1.0|2.0|
| stddev|596.7389155512932|355.0929455790413|0.0|1.7320508075688772|2.0|
| min| 57.0| 2104.0|0.0| 0.0|0.0|
| max| 1241.0| 2723.0|0.0| 3.0|4.0|
+-------+-----------------+-----------------+---+------------------+---+
在您的情况下将上述代码与 dimensionality=14
一起使用应该可以完成工作...
对 mean
和 stddev
中所有这些(可以说是无用的)有效数字感到恼火?作为奖励,这是我想出的一个小效用函数 some time ago 作为一个漂亮的总结:
def prettySummary(df):
""" Neat summary statistics of a Spark dataframe
Args:
pyspark.sql.dataframe.DataFrame (df): input dataframe
Returns:
pandas.core.frame.DataFrame: a pandas dataframe with the summary statistics of df
"""
import pandas as pd
temp = df.describe().toPandas()
temp.iloc[1:3,1:] = temp.iloc[1:3,1:].convert_objects(convert_numeric=True)
pd.options.display.float_format = '{:,.2f}'.format
return temp
stats_df = prettySummary(cluster_2)
stats_df
# result:
summary x0 x1 x2 x3 x4
0 count 3 3 3 3 3
1 mean 605.67 2,514.00 0.00 1.00 2.00
2 stddev 596.74 355.09 0.00 1.73 2.00
3 min 57.0 2104.0 0.0 0.0 0.0
4 max 1241.0 2723.0 0.0 3.0 4.0
UPDATE: 再次思考,看到你的示例数据,我想出了一个更直接的解决方案,不需要调用中间 RDD(一个操作如果可能的话,可以说更愿意避免)...
关键观察是 transformed
的完整内容,即 没有 select
语句;保持与上面相同的玩具数据集,我们得到:
transformed = model.transform(df) # no 'select' statements
transformed.show()
# +--------------------+------+----------+
# | features|target|prediction|
# +--------------------+------+----------+
# |(5,[0,1],[164.0,5...| 1.0| 1|
# |[519.0,2723.0,0.0...| 1.0| 2|
# |(5,[0,1],[2868.0,...| 1.0| 0|
# |(5,[0,1],[57.0,27...| 0.0| 2|
# |[1241.0,2104.0,0....| 1.0| 2|
# +--------------------+------+----------+
如您所见,无论数据帧 df
中是否存在其他要转换的列(在我的例子中只有一列 - target
),只需 "pass-through" 转换过程并结束-up 出现在最终结果中...
希望您开始理解:如果 df
包含您最初的 14 个特征,每个特征在一个单独的列中,加上名为 features
的第 15 列(大致如示例数据所示,但没有最后一列),然后是以下代码:
kmeans = KMeans().setK(14)
model = kmeans.fit(df.select('features'))
transformed = model.transform(df).drop('features')
将为您留下一个包含 15 列的 Spark 数据框 transformed
,即您最初的 14 个特征加上具有相应簇号的 prediction
列。
从这一点开始,您可以按照我上面显示的方式继续 transformed
中的 filter
个特定集群并获得您的摘要统计信息,但您将避免(昂贵的...)转换为中间临时 RDD,从而使您的所有操作保持在更高效的 Spark 数据帧上下文中...