pyspark 根据 ID 计算四分位数并根据四分位数范围进行分类

pyspark calculate quartiles based on ID and classify based on the quartile ranges

我正在使用 pyspark 1.5.2。我有一个带有列 "ID" 和 "Height" 的 pyspark 数据框,如下所示:

|           ID1|           ID2|       height|
---------------------------------------------
|             1|           000|           44|
|             2|           000|         72.9|
|             3|           000|           89|
|             4|           000|         45.5|
|             5|           000|         52.3|
|             6|           000|         87.9|
|             7|           000|         63.1|
|             8|           000|         26.1|
|             9|           000|           97|
|            10|           000|          120|
|            11|           000|           99|
|            12|           000|           96|
|            13|           000|         36.5|
|            14|           000|            0|
|            15|           001|           48|
|            16|           001|        152.1|
|            17|           001|         72.2|
|            18|           001|         21.5|
|            19|           001|           94|
|            20|           001|          220|
+--------------+--------------+-------------+

我想计算每个 "ID2" 身高的四分位数,并根据以下标准将他们分为高、中、矮:

Short: All height < Q1
Medium: All height within inclusive inter-quartile range (IQR) Q3-Q1
Tall: All height > Q3

我查看了 pyspark.sql 模块并找到了一个 summary() 函数,我可以用它计算四分位数范围,但它不适用于基于列 "ID2" 的 groupby。最终结果将是:

|       ID1 |     Height |
-------------------------
|          1|        Tall|
|          2|       Short|
|          3|      Medium|  and so on

我该怎么做?有没有更好的方法或更简单的方法?

我对此很陌生,非常感谢任何帮助!

提前致谢!

在处理函数中传递您的输入数据帧,它将return返回作者预期的输出数据帧。 分位数函数 returns 既包含范围内的值,又插入原始数据帧。然后可以 运行 顶部的比较器。

import pandas as pd
import numpy as np

def quantile(x):
    x['first']= np.percentile(x['height'], 25)
    x['third']= np.percentile(x['height'], 75)
    return  x

def process(input_df):
    grouped_df = input_df.groupby(['ID2']).apply(quantile)
    grouped_df.loc[(grouped_df.height >= grouped_df.first) & (grouped_df.height <= grouped_df.third), 'result'] = 'Medium'
    grouped_df.loc[(grouped_df.height < grouped_df.first) , 'result'] = 'Short'
    grouped_df.loc[ (grouped_df.height > grouped_df.third),'result'] = 'Tall'
    return grouped_df


main_df = pd.read_csv('sample.csv')
print process(main_df)

这是一种不使用 pyspark 1.5.2 pandas 的方法。 "input_df" 是这里的原始数据框:

input_df.registerTempTable("input_df")
quartile_df = sqlContext.sql("select id2, percentile_approx(cast(height as decimal), 0.25) as Q1_value, percentile_approx(cast(height as decimal), 0.5) as Q2_value, percentile_approx(cast(height as decimal), 0.75) as Q3_value from input_df group by id2")
input_df=input_df.join(quartile_df, input_df.id2 == quartile_df.id2, 'left_outer')
input_df.select(F.when(input_df.height < input_df.Q1_value, 'short').when(input_df.height.between(input_df.Q1_value, input_df.Q3_value), 'medium').when(input_df.height > input_df.Q3_value, 'tall').alias('height_tag')).show()

上面的代码基本上将 input_df 注册为临时 table 并允许通过 SQL 进行查询。在查询中,使用 percentile_approx 给出 Q1、Q2 和 Q3 分别为 25%、50% 和 75%,最终与原始数据框连接。在最终的 LOC 中,"height" 的每个值根据条件分为短、中或高。

希望对您有所帮助