数据框中列的几何平均值
Geometric mean of columns in dataframe
我使用此代码计算数据框中所有行的几何平均值:
from pyspark.sql.functions import rand, randn, sqrt
df = sqlContext.range(0, 10)
df = df.select(rand(seed=10).alias("c1"), randn(seed=27).alias("c2"))
df.show()
newdf = df.withColumn('total', sqrt(sum(df[col] for col in df.columns)))
newdf.show()
这显示:
要计算列而不是行的几何平均值,我认为这段代码应该足够了:
newdf = df.withColumn('total', sqrt(sum(df[row] for row in df.rows)))
但这会引发错误:NameError: global name 'row' is not defined
所以看来访问列的 api 与访问行不同。
我是否应该格式化数据以将行转换为列,然后重新使用工作算法:newdf = df.withColumn('total', sqrt(sum(df[col] for col in df.columns)))
或者是否有按原样处理行和列的解决方案?
我不确定你对几何平均数的定义是否正确。根据Wikipedia,几何平均数定义为n个数的乘积的n次方根。根据同一页,几何平均数也可以表示为对数的算术平均数的指数。我将使用它来计算每列的几何平均值。
您可以计算几何平均值,方法是将 c1
和 c2
的列数据合并到名为 value
的新列中,将源列名称存储在 column
中.数据重新格式化后,通过按 column
(c1
或 c2
)分组并计算每组对数值的算术平均值的指数来确定几何平均值。在此计算中 NaN
值被忽略。
from pyspark.sql import functions as F
df = sqlContext.range(0, 10)
df = df.select(F.rand(seed=10).alias("c1"), F.randn(seed=27).alias("c2"))
df_id = df.withColumn("id", F.monotonically_increasing_id())
kvp = F.explode(F.array([F.struct(F.lit(c).alias("column"), F.col(c).alias("value")) for c in df.columns])).alias("kvp")
df_pivoted = df_id.select(['id'] + [kvp]).select(['id'] + ["kvp.column", "kvp.value"])
df_geometric_mean = df_pivoted.groupBy(['column']).agg(F.exp(F.avg(F.log(df_pivoted.value))))
df_geometric_mean.withColumnRenamed("EXP(avg(LOG(value)))", "geometric_mean").show()
这个returns:
+------+-------------------+
|column| geometric_mean|
+------+-------------------+
| c1|0.25618961513533134|
| c2| 0.415119290980354|
+------+-------------------+
这些几何意味着,除了它们的精度之外,如果 NaN 值也被忽略,则与几何平均值 return 相匹配 scipy。
from scipy.stats.mstats import gmean
c1=[x['c1'] for x in df.collect() if x['c1']>0]
c2=[x['c2'] for x in df.collect() if x['c2']>0]
print 'c1 : {0}\r\nc2 : {1}'.format(gmean(c1),gmean(c2))
此代码段 returns:
| c1|0.256189615135|
| c2|0.41511929098|
我使用此代码计算数据框中所有行的几何平均值:
from pyspark.sql.functions import rand, randn, sqrt
df = sqlContext.range(0, 10)
df = df.select(rand(seed=10).alias("c1"), randn(seed=27).alias("c2"))
df.show()
newdf = df.withColumn('total', sqrt(sum(df[col] for col in df.columns)))
newdf.show()
这显示:
要计算列而不是行的几何平均值,我认为这段代码应该足够了:
newdf = df.withColumn('total', sqrt(sum(df[row] for row in df.rows)))
但这会引发错误:NameError: global name 'row' is not defined
所以看来访问列的 api 与访问行不同。
我是否应该格式化数据以将行转换为列,然后重新使用工作算法:newdf = df.withColumn('total', sqrt(sum(df[col] for col in df.columns)))
或者是否有按原样处理行和列的解决方案?
我不确定你对几何平均数的定义是否正确。根据Wikipedia,几何平均数定义为n个数的乘积的n次方根。根据同一页,几何平均数也可以表示为对数的算术平均数的指数。我将使用它来计算每列的几何平均值。
您可以计算几何平均值,方法是将 c1
和 c2
的列数据合并到名为 value
的新列中,将源列名称存储在 column
中.数据重新格式化后,通过按 column
(c1
或 c2
)分组并计算每组对数值的算术平均值的指数来确定几何平均值。在此计算中 NaN
值被忽略。
from pyspark.sql import functions as F
df = sqlContext.range(0, 10)
df = df.select(F.rand(seed=10).alias("c1"), F.randn(seed=27).alias("c2"))
df_id = df.withColumn("id", F.monotonically_increasing_id())
kvp = F.explode(F.array([F.struct(F.lit(c).alias("column"), F.col(c).alias("value")) for c in df.columns])).alias("kvp")
df_pivoted = df_id.select(['id'] + [kvp]).select(['id'] + ["kvp.column", "kvp.value"])
df_geometric_mean = df_pivoted.groupBy(['column']).agg(F.exp(F.avg(F.log(df_pivoted.value))))
df_geometric_mean.withColumnRenamed("EXP(avg(LOG(value)))", "geometric_mean").show()
这个returns:
+------+-------------------+
|column| geometric_mean|
+------+-------------------+
| c1|0.25618961513533134|
| c2| 0.415119290980354|
+------+-------------------+
这些几何意味着,除了它们的精度之外,如果 NaN 值也被忽略,则与几何平均值 return 相匹配 scipy。
from scipy.stats.mstats import gmean
c1=[x['c1'] for x in df.collect() if x['c1']>0]
c2=[x['c2'] for x in df.collect() if x['c2']>0]
print 'c1 : {0}\r\nc2 : {1}'.format(gmean(c1),gmean(c2))
此代码段 returns:
| c1|0.256189615135|
| c2|0.41511929098|