计算 PySpark DataFrame 列的模式?
Calculate the mode of a PySpark DataFrame column?
最终我想要的是一个列的模式,对于DataFrame中的所有列。对于其他汇总统计信息,我看到了几个选项:使用 DataFrame 聚合,或将 DataFrame 的列映射到向量的 RDD(我也很难做到)并使用 MLlib 中的 colStats
。但我不认为模式是那里的一个选项。
模式的问题与中位数的问题几乎相同。虽然它很容易计算,但计算却相当昂贵。可以使用排序后跟本地和全局聚合或使用 just-another-wordcount 和过滤器来完成:
import numpy as np
np.random.seed(1)
df = sc.parallelize([
(int(x), ) for x in np.random.randint(50, size=10000)
]).toDF(["x"])
cnts = df.groupBy("x").count()
mode = cnts.join(
cnts.agg(max("count").alias("max_")), col("count") == col("max_")
).limit(1).select("x")
mode.first()[0]
## 0
无论哪种方式,每列都可能需要完全随机播放。
您可以使用 Java 代码计算列模式,如下所示:
case MODE:
Dataset<Row> cnts = ds.groupBy(column).count();
Dataset<Row> dsMode = cnts.join(
cnts.agg(functions.max("count").alias("max_")),
functions.col("count").equalTo(functions.col("max_")
));
Dataset<Row> mode = dsMode.limit(1).select(column);
replaceValue = ((GenericRowWithSchema) mode.first()).values()[0];
ds = replaceWithValue(ds, column, replaceValue);
break;
private static Dataset<Row> replaceWithValue(Dataset<Row> ds, String column, Object replaceValue) {
return ds.withColumn(column,
functions.coalesce(functions.col(column), functions.lit(replaceValue)));
}
>>> df=newdata.groupBy('columnName').count()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]
See My result
>>> newdata.groupBy('var210').count().show()
+------+-----+
|var210|count|
+------+-----+
| 3av_| 64|
| 7A3j| 509|
| g5HH| 1489|
| oT7d| 109|
| DM_V| 149|
| uKAI|44883|
+------+-----+
# store the above result in df
>>> df=newdata.groupBy('var210').count()
>>> df.orderBy(df['count'].desc()).collect()
[Row(var210='uKAI', count=44883),
Row(var210='g5HH', count=1489),
Row(var210='7A3j', count=509),
Row(var210='DM_V', count=149),
Row(var210='oT7d', count=109),
Row(var210='3av_', count=64)]
# get the first value using collect()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]
>>> mode
'uKAI'
using groupBy() function getting count of each category in column. df is my result data frame has two columns var210,count. using orderBy() with column name 'count' in descending order give the max value in 1st row of data frame. collect()[0][0] is used to get the 1 tuple in data frame
此行将为您提供 spark 数据帧 df 中“col”的模式:
df.groupby("col").count().orderBy("count", ascending=False).first()[0]
要获取 df 中所有列的模式列表,请使用:
[df.groupby(i).count().orderBy("count", ascending=False).first()[0] for i in df.columns]
要添加名称以标识哪列的模式,制作二维列表:
[[i,df.groupby(i).count().orderBy("count", ascending=False).first()[0]] for i in df.columns]
以下方法可以帮助您获取输入数据帧的所有列的模式
from pyspark.sql.functions import monotonically_increasing_id
def get_mode(df):
column_lst = df.columns
res = [df.select(i).groupby(i).count().orderBy("count", ascending=False) for i in column_lst]
df_mode = res[0].limit(1).select(column_lst[0]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
for i in range(1, len(res)):
df2 = res[i].limit(1).select(column_lst[i]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
df_mode = df_mode.join(df2, (df_mode.temp_name_monotonically_increasing_id == df2.temp_name_monotonically_increasing_id)).drop(df2.temp_name_monotonically_increasing_id)
return df_mode.drop("temp_name_monotonically_increasing_id")
最终我想要的是一个列的模式,对于DataFrame中的所有列。对于其他汇总统计信息,我看到了几个选项:使用 DataFrame 聚合,或将 DataFrame 的列映射到向量的 RDD(我也很难做到)并使用 MLlib 中的 colStats
。但我不认为模式是那里的一个选项。
模式的问题与中位数的问题几乎相同。虽然它很容易计算,但计算却相当昂贵。可以使用排序后跟本地和全局聚合或使用 just-another-wordcount 和过滤器来完成:
import numpy as np
np.random.seed(1)
df = sc.parallelize([
(int(x), ) for x in np.random.randint(50, size=10000)
]).toDF(["x"])
cnts = df.groupBy("x").count()
mode = cnts.join(
cnts.agg(max("count").alias("max_")), col("count") == col("max_")
).limit(1).select("x")
mode.first()[0]
## 0
无论哪种方式,每列都可能需要完全随机播放。
您可以使用 Java 代码计算列模式,如下所示:
case MODE:
Dataset<Row> cnts = ds.groupBy(column).count();
Dataset<Row> dsMode = cnts.join(
cnts.agg(functions.max("count").alias("max_")),
functions.col("count").equalTo(functions.col("max_")
));
Dataset<Row> mode = dsMode.limit(1).select(column);
replaceValue = ((GenericRowWithSchema) mode.first()).values()[0];
ds = replaceWithValue(ds, column, replaceValue);
break;
private static Dataset<Row> replaceWithValue(Dataset<Row> ds, String column, Object replaceValue) {
return ds.withColumn(column,
functions.coalesce(functions.col(column), functions.lit(replaceValue)));
}
>>> df=newdata.groupBy('columnName').count()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]
See My result
>>> newdata.groupBy('var210').count().show()
+------+-----+
|var210|count|
+------+-----+
| 3av_| 64|
| 7A3j| 509|
| g5HH| 1489|
| oT7d| 109|
| DM_V| 149|
| uKAI|44883|
+------+-----+
# store the above result in df
>>> df=newdata.groupBy('var210').count()
>>> df.orderBy(df['count'].desc()).collect()
[Row(var210='uKAI', count=44883),
Row(var210='g5HH', count=1489),
Row(var210='7A3j', count=509),
Row(var210='DM_V', count=149),
Row(var210='oT7d', count=109),
Row(var210='3av_', count=64)]
# get the first value using collect()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]
>>> mode
'uKAI'
using groupBy() function getting count of each category in column. df is my result data frame has two columns var210,count. using orderBy() with column name 'count' in descending order give the max value in 1st row of data frame. collect()[0][0] is used to get the 1 tuple in data frame
此行将为您提供 spark 数据帧 df 中“col”的模式:
df.groupby("col").count().orderBy("count", ascending=False).first()[0]
要获取 df 中所有列的模式列表,请使用:
[df.groupby(i).count().orderBy("count", ascending=False).first()[0] for i in df.columns]
要添加名称以标识哪列的模式,制作二维列表:
[[i,df.groupby(i).count().orderBy("count", ascending=False).first()[0]] for i in df.columns]
以下方法可以帮助您获取输入数据帧的所有列的模式
from pyspark.sql.functions import monotonically_increasing_id
def get_mode(df):
column_lst = df.columns
res = [df.select(i).groupby(i).count().orderBy("count", ascending=False) for i in column_lst]
df_mode = res[0].limit(1).select(column_lst[0]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
for i in range(1, len(res)):
df2 = res[i].limit(1).select(column_lst[i]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
df_mode = df_mode.join(df2, (df_mode.temp_name_monotonically_increasing_id == df2.temp_name_monotonically_increasing_id)).drop(df2.temp_name_monotonically_increasing_id)
return df_mode.drop("temp_name_monotonically_increasing_id")