如何在不影响索引列的情况下计算Spark数据框中记录的出现频率并将其作为新列添加到数据框中?
How can count occurrence frequency of records in Spark data frame and add it as new column to data frame without affecting on index column?
我正在尝试将名为 Freq
的新列添加到给定的 spark 数据帧 而不影响索引列 或记录的帧顺序以分配回结果数据帧中右侧 row/incident/event/record 的统计频率(即计数)。
这是我的数据框:
+---+-------------+------+------------+-------------+-----------------+
| id| Type|Length|Token_number|Encoding_type|Character_feature|
+---+-------------+------+------------+-------------+-----------------+
| 0| Sentence| 4014| 198| false| 136|
| 1| contextid| 90| 2| false| 15|
| 2| Sentence| 172| 11| false| 118|
| 3| String| 12| 0| true| 11|
| 4|version-style| 16| 0| false| 13|
| 5| Sentence| 339| 42| false| 110|
| 6|version-style| 16| 0| false| 13|
| 7| url_variable| 10| 2| false| 9|
| 8| url_variable| 10| 2| false| 9|
| 9| Sentence| 172| 11| false| 117|
| 10| contextid| 90| 2| false| 15|
| 11| Sentence| 170| 11| false| 114|
| 12|version-style| 16| 0| false| 13|
| 13| Sentence| 68| 10| false| 59|
| 14| String| 12| 0| true| 11|
| 15| Sentence| 173| 11| false| 118|
| 16| String| 12| 0| true| 11|
| 17| Sentence| 132| 8| false| 96|
| 18| String| 12| 0| true| 11|
| 19| contextid| 88| 2| false| 15|
+---+-------------+------+------------+-------------+-----------------+
由于索引列 id
:
的存在,我尝试了以下脚本但未成功
from pyspark.sql import functions as F
from pyspark.sql import Window
bo = features_sdf.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
sdf2 = (
bo.na.fill(0).withColumn(
'Freq',
F.count("*").over(Window.partitionBy(bo.columns))
).withColumn(
'MaxFreq',
F.max('Freq').over(Window.partitionBy())
).withColumn(
'MinFreq',
F.min('Freq').over(Window.partitionBy())
)
)
sdf2.show()
#bad result due to existence of id column which makes every record unique and causes Freq=1
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+
| id| Type|Length|Token_number|Encoding_type|Character_feature|Freq|MaxFreq|MinFreq|
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+
| 0| Sentence| 4014| 198| false| 136| 1| 1| 1|
| 1| contextid| 90| 2| false| 15| 1| 1| 1|
| 2| Sentence| 172| 11| false| 118| 1| 1| 1|
| 3| String| 12| 0| true| 11| 1| 1| 1|
| 4|version-style| 16| 0| false| 13| 1| 1| 1|
| 5| Sentence| 339| 42| false| 110| 1| 1| 1|
| 6|version-style| 16| 0| false| 13| 1| 1| 1|
| 7| url_variable| 10| 2| false| 9| 1| 1| 1|
| 8| url_variable| 10| 2| false| 9| 1| 1| 1|
| 9| Sentence| 172| 11| false| 117| 1| 1| 1|
| 10| contextid| 90| 2| false| 15| 1| 1| 1|
| 11| Sentence| 170| 11| false| 114| 1| 1| 1|
| 12|version-style| 16| 0| false| 13| 1| 1| 1|
| 13| Sentence| 68| 10| false| 59| 1| 1| 1|
| 14| String| 12| 0| true| 11| 1| 1| 1|
| 15| Sentence| 173| 11| false| 118| 1| 1| 1|
| 16| String| 12| 0| true| 11| 1| 1| 1|
| 17| Sentence| 132| 8| false| 96| 1| 1| 1|
| 18| String| 12| 0| true| 11| 1| 1| 1|
| 19| contextid| 88| 2| false| 15| 1| 1| 1|
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+
如果我排除索引列 id
代码可以工作但不知何故它会弄乱顺序(由于不需要的 sorting/ordering)并且结果不会分配给右边 record/row如下:
+--------+------+------------+-------------+-----------------+----+-------+-------+
| Type|Length|Token_number|Encoding_type|Character_feature|Freq|MaxFreq|MinFreq|
+--------+------+------------+-------------+-----------------+----+-------+-------+
|Sentence| 7| 1| false| 6| 2| 1665| 1|
|Sentence| 7| 1| false| 6| 2| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
+--------+------+------------+-------------+-----------------+----+-------+-------+
最后,我想添加这个函数,并使用简单的数学公式将其归一化在 0
和 1
之间,并将其用作新功能。在规范化期间,我也遇到了问题并获得 null
值。我已经实现了 pythonic 版本,它非常简单,但我受够了 spark:
#Statistical Preprocessing
def add_freq_to_features(df):
frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})
frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1
new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))
return new_df
# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features(oba)
features_df.head(20)
如我所料,结果如下:
我也尝试过使用 df.groupBy(df.columns).count()
乱码翻译 pythonic 脚本,但我不能:
# this is to build "raw" Freq based on @pltc answer
sdf2 = (sdf
.groupBy(sdf.columns)
.agg(F.count('*').alias('Freq'))
.withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
)
sdf2.cache().count()
sdf2.show()
这里是 PySpark 的完整代码,我们在这个 colab notebook 中根据@ggordon 的回答尝试了简化示例:
def add_freq_to_features_(df):
from pyspark.sql import functions as F
from pyspark.sql import Window
sdf_pltc = df.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
print("Before Any Modification") # only included for debugging purposes
sdf_pltc.show(5,truncate=0)
# fill missing values with 0 using `na.fill(0)` before applying count as window function
sdf2 = (
sdf_pltc.na.fill(0).withColumn(
'Freq',
F.count("*").over(Window.partitionBy(sdf_pltc.columns))
).withColumn(
'MaxFreq',
F.max('Freq').over(Window.partitionBy())
).withColumn(
'MinFreq',
F.min('Freq').over(Window.partitionBy())
)
.withColumn('id' , F.col('id'))
)
print("After replacing null with 0 and counting by partitions") # only included for debugging purposes
# use orderby as your last operation, only included here for debugging purposes
#sdf2 = sdf2.orderBy(F.col('Type').desc(),F.col('Length').desc() )
sdf2.show(5,truncate=False) # only included for debugging purposes
sdf2 = (
sdf2.withColumn('Freq' , F.when(
F.col('MaxFreq')==0.000000000 , 0
).otherwise(
(F.col('Freq')-F.col('MinFreq')) / (F.col('MaxFreq') - F.col('MinFreq'))
)
) # Normalzing between 0 & 1
)
sdf2 = sdf2.drop('MinFreq').drop('MaxFreq')
sdf2 = sdf2.withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
#sdf2 = sdf2.orderBy(F.col('Type').desc(),F.col('Length').desc() )
print("After normalization, encoding transformation and order by ") # only included for debugging purposes
sdf2.show(50,truncate=False)
return sdf2
遗憾的是,由于要处理 BigData,我无法用 df.toPandas()
破解它,它很便宜并且导致 OOM error。
任何帮助将不胜感激。
pandas 行为不同,因为 ID 字段是 DataFrame 索引,因此它不计入您所做的“按所有人分组”。您只需更改一处即可在 Spark 中获得相同的行为。
partitionBy 采用任何普通的字符串列表,尝试从您的分区键列表中删除 id 列,如下所示:
bo = features_sdf.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
partition_columns = bo.columns.remove('id')
sdf2 = (
bo.na.fill(0).withColumn(
'Freq',
F.count("*").over(Window.partitionBy(partition_columns))
).withColumn(
'MaxFreq',
F.max('Freq').over(Window.partitionBy())
).withColumn(
'MinFreq',
F.min('Freq').over(Window.partitionBy())
)
)
这将为您提供您所说有效的结果,但保留 ID 字段。您需要弄清楚如何对频率进行除法,但这应该可以帮助您入门。
我正在尝试将名为 Freq
的新列添加到给定的 spark 数据帧 而不影响索引列 或记录的帧顺序以分配回结果数据帧中右侧 row/incident/event/record 的统计频率(即计数)。
这是我的数据框:
+---+-------------+------+------------+-------------+-----------------+
| id| Type|Length|Token_number|Encoding_type|Character_feature|
+---+-------------+------+------------+-------------+-----------------+
| 0| Sentence| 4014| 198| false| 136|
| 1| contextid| 90| 2| false| 15|
| 2| Sentence| 172| 11| false| 118|
| 3| String| 12| 0| true| 11|
| 4|version-style| 16| 0| false| 13|
| 5| Sentence| 339| 42| false| 110|
| 6|version-style| 16| 0| false| 13|
| 7| url_variable| 10| 2| false| 9|
| 8| url_variable| 10| 2| false| 9|
| 9| Sentence| 172| 11| false| 117|
| 10| contextid| 90| 2| false| 15|
| 11| Sentence| 170| 11| false| 114|
| 12|version-style| 16| 0| false| 13|
| 13| Sentence| 68| 10| false| 59|
| 14| String| 12| 0| true| 11|
| 15| Sentence| 173| 11| false| 118|
| 16| String| 12| 0| true| 11|
| 17| Sentence| 132| 8| false| 96|
| 18| String| 12| 0| true| 11|
| 19| contextid| 88| 2| false| 15|
+---+-------------+------+------------+-------------+-----------------+
由于索引列 id
:
from pyspark.sql import functions as F
from pyspark.sql import Window
bo = features_sdf.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
sdf2 = (
bo.na.fill(0).withColumn(
'Freq',
F.count("*").over(Window.partitionBy(bo.columns))
).withColumn(
'MaxFreq',
F.max('Freq').over(Window.partitionBy())
).withColumn(
'MinFreq',
F.min('Freq').over(Window.partitionBy())
)
)
sdf2.show()
#bad result due to existence of id column which makes every record unique and causes Freq=1
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+
| id| Type|Length|Token_number|Encoding_type|Character_feature|Freq|MaxFreq|MinFreq|
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+
| 0| Sentence| 4014| 198| false| 136| 1| 1| 1|
| 1| contextid| 90| 2| false| 15| 1| 1| 1|
| 2| Sentence| 172| 11| false| 118| 1| 1| 1|
| 3| String| 12| 0| true| 11| 1| 1| 1|
| 4|version-style| 16| 0| false| 13| 1| 1| 1|
| 5| Sentence| 339| 42| false| 110| 1| 1| 1|
| 6|version-style| 16| 0| false| 13| 1| 1| 1|
| 7| url_variable| 10| 2| false| 9| 1| 1| 1|
| 8| url_variable| 10| 2| false| 9| 1| 1| 1|
| 9| Sentence| 172| 11| false| 117| 1| 1| 1|
| 10| contextid| 90| 2| false| 15| 1| 1| 1|
| 11| Sentence| 170| 11| false| 114| 1| 1| 1|
| 12|version-style| 16| 0| false| 13| 1| 1| 1|
| 13| Sentence| 68| 10| false| 59| 1| 1| 1|
| 14| String| 12| 0| true| 11| 1| 1| 1|
| 15| Sentence| 173| 11| false| 118| 1| 1| 1|
| 16| String| 12| 0| true| 11| 1| 1| 1|
| 17| Sentence| 132| 8| false| 96| 1| 1| 1|
| 18| String| 12| 0| true| 11| 1| 1| 1|
| 19| contextid| 88| 2| false| 15| 1| 1| 1|
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+
如果我排除索引列 id
代码可以工作但不知何故它会弄乱顺序(由于不需要的 sorting/ordering)并且结果不会分配给右边 record/row如下:
+--------+------+------------+-------------+-----------------+----+-------+-------+
| Type|Length|Token_number|Encoding_type|Character_feature|Freq|MaxFreq|MinFreq|
+--------+------+------------+-------------+-----------------+----+-------+-------+
|Sentence| 7| 1| false| 6| 2| 1665| 1|
|Sentence| 7| 1| false| 6| 2| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
+--------+------+------------+-------------+-----------------+----+-------+-------+
最后,我想添加这个函数,并使用简单的数学公式将其归一化在 0
和 1
之间,并将其用作新功能。在规范化期间,我也遇到了问题并获得 null
值。我已经实现了 pythonic 版本,它非常简单,但我受够了 spark:
#Statistical Preprocessing
def add_freq_to_features(df):
frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})
frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1
new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))
return new_df
# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features(oba)
features_df.head(20)
如我所料,结果如下:
我也尝试过使用 df.groupBy(df.columns).count()
乱码翻译 pythonic 脚本,但我不能:
# this is to build "raw" Freq based on @pltc answer
sdf2 = (sdf
.groupBy(sdf.columns)
.agg(F.count('*').alias('Freq'))
.withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
)
sdf2.cache().count()
sdf2.show()
这里是 PySpark 的完整代码,我们在这个 colab notebook 中根据@ggordon 的回答尝试了简化示例:
def add_freq_to_features_(df):
from pyspark.sql import functions as F
from pyspark.sql import Window
sdf_pltc = df.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
print("Before Any Modification") # only included for debugging purposes
sdf_pltc.show(5,truncate=0)
# fill missing values with 0 using `na.fill(0)` before applying count as window function
sdf2 = (
sdf_pltc.na.fill(0).withColumn(
'Freq',
F.count("*").over(Window.partitionBy(sdf_pltc.columns))
).withColumn(
'MaxFreq',
F.max('Freq').over(Window.partitionBy())
).withColumn(
'MinFreq',
F.min('Freq').over(Window.partitionBy())
)
.withColumn('id' , F.col('id'))
)
print("After replacing null with 0 and counting by partitions") # only included for debugging purposes
# use orderby as your last operation, only included here for debugging purposes
#sdf2 = sdf2.orderBy(F.col('Type').desc(),F.col('Length').desc() )
sdf2.show(5,truncate=False) # only included for debugging purposes
sdf2 = (
sdf2.withColumn('Freq' , F.when(
F.col('MaxFreq')==0.000000000 , 0
).otherwise(
(F.col('Freq')-F.col('MinFreq')) / (F.col('MaxFreq') - F.col('MinFreq'))
)
) # Normalzing between 0 & 1
)
sdf2 = sdf2.drop('MinFreq').drop('MaxFreq')
sdf2 = sdf2.withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
#sdf2 = sdf2.orderBy(F.col('Type').desc(),F.col('Length').desc() )
print("After normalization, encoding transformation and order by ") # only included for debugging purposes
sdf2.show(50,truncate=False)
return sdf2
遗憾的是,由于要处理 BigData,我无法用 df.toPandas()
破解它,它很便宜并且导致 OOM error。
任何帮助将不胜感激。
pandas 行为不同,因为 ID 字段是 DataFrame 索引,因此它不计入您所做的“按所有人分组”。您只需更改一处即可在 Spark 中获得相同的行为。
partitionBy 采用任何普通的字符串列表,尝试从您的分区键列表中删除 id 列,如下所示:
bo = features_sdf.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
partition_columns = bo.columns.remove('id')
sdf2 = (
bo.na.fill(0).withColumn(
'Freq',
F.count("*").over(Window.partitionBy(partition_columns))
).withColumn(
'MaxFreq',
F.max('Freq').over(Window.partitionBy())
).withColumn(
'MinFreq',
F.min('Freq').over(Window.partitionBy())
)
)
这将为您提供您所说有效的结果,但保留 ID 字段。您需要弄清楚如何对频率进行除法,但这应该可以帮助您入门。