Pyspark 使用空条目计算行加权平均值
Pyspark calculate row-wise weighted average with null entries
我有多个数据框,其值是根据不同的源数据计算得出的。为简单起见,我将举一个包含三个数据帧的示例,但我正在寻找一个包含 n 个数据帧的解决方案
data_1
+------+-----------+
|person|first_value|
+------+-----------+
| 1| 1.0|
| 2| 0.9|
| 3| 0.8|
| 4| 0.7|
+------+-----------+
data_2
+------+------------+
|person|second_value|
+------+------------+
| 1| 0.5|
| 2| 0.6|
| 4| 0.7|
+------+------------+
data_3
+------+-----------+
|person|third_value|
+------+-----------+
| 1| 0.2|
| 3| 0.9|
| 4| 0.6|
+------+-----------+
现在我想计算两个或更多数据帧的加权平均值 - 为此我首先合并数据帧
+------+-----------+------------+-----------+
|person|first_value|second_value|third_value|
+------+-----------+------------+-----------+
| 1| 1.0| 0.5| 0.2|
| 2| 0.9| 0.6| null|
| 3| 0.8| null| 0.9|
| 4| 0.8| 0.7| 0.6|
+------+-----------+------------+-----------+
合并值的公式为:
val = val1 * weight1 + val2 * weight2 + val3 * weight3
但是,如果其中一个值为空,则另一个值应该能够加起来为 1,因此如果 val2 为空,则权重 2 应该分配给所有其他权重。我只是找不到一种优雅的方式来做到这一点。
w1 = 0.3, w2 = 0.4, w3 = 0.3 我目前得到的结果是我的公式:
+------+----+
|person| val|
+------+----+
| 3|null|
| 1|0.56|
| 4| 0.7|
| 2|null|
+------+----+
不过我想要这个:
+------+-----+
|person| val|
+------+-----+
| 1| 0.56|
| 2|0.729| <- val1*weight1_adj2 + val2*weight2_adj2
| 3| 0.85| <- val1*weight1_adj3 + val3*weight3_adj3
| 4| 0.7|
+------+-----+
调整后的权重
weight1_adj2 = w1/(w1+w2) = 0.57
weight2_adj2 = w2/(w1+w2) = 0.43
weight1_adj3 = w1/(w1+w3) = 0.5
weight3_adj3 = w3/(w1/w3) = 0.5
有什么方法可以在 pyspark 甚至 sql 中解决这个问题,还是我必须进入 udf?
这是我当前不处理空值的代码:
data1 = [("1",1.0),
("2",0.9),
("3",0.8),
("4",0.8)
]
schema1 = ["person","first_value"]
first_df = spark.createDataFrame(data=data1, schema = schema1)
data2 = [("1",0.5),
("2",0.6),
("4",0.7)
]
schema2 = ["person","second_value"]
second_df = spark.createDataFrame(data=data2, schema = schema2)
data3 = [("1",0.2),
("3",0.9),
("4",0.6)
]
schema3 = ["person","third_value"]
third_df = spark.createDataFrame(data=data3, schema = schema3)
combined_df = first_df.join(
second_df, ['person'], how='full'
).join(
third_df, ['person'], how='full'
)
w1 = 0.3
w2 = 0.4
w3 = 0.3
combined_df.groupBy(['person']).agg(
F.sum(
col('first_value')*w1 + col('second_value')*w2 + col('third_value')*w3
).alias('val')).show()
Edit1:我不是在询问按行添加空值,如下所述: - 我需要处理权重,以便乘以非空值的权重之和总是 1
我们的想法是对列不为空的每行的所有权重求和,然后将各个权重除以该总和。
为了在列数及其权重方面获得一些灵活性,我将权重存储在字典中,使用列名作为键:
weights = {"first_value": 0.3, "second_value": 0.4, "third_value": 0.3}
然后我可以遍历字典到
- 计算非空列的权重总和
- 然后计算
value of column
* weight
/ sum of weights
的所有非列的总和
wf = "1 / ("
val = ""
for col in weights:
wf += f"if({col} is null,0 ,{weights[col]}) + "
val += f"if( {col} is null, 0, {col} * {weights[col]} * weight_factor) + "
wf += "0 )"
val += "0"
combined_df = combined_df.withColumn("weight_factor", F.expr(wf)) \
.withColumn("val", F.expr(val))
输出:
+------+-----------+------------+-----------+-----------------+------------------+
|person|first_value|second_value|third_value| weight_factor| val|
+------+-----------+------------+-----------+-----------------+------------------+
| 1| 1.0| 0.5| 0.2|1.000000000000000| 0.56|
| 2| 0.9| 0.6| null|1.428571428571429|0.7285714285714289|
| 3| 0.8| null| 0.9|1.666666666666667|0.8500000000000002|
| 4| 0.8| 0.7| 0.6|1.000000000000000| 0.7|
+------+-----------+------------+-----------+-----------------+------------------+
作为下一步,您可以继续聚合并对 val
求和。
我有多个数据框,其值是根据不同的源数据计算得出的。为简单起见,我将举一个包含三个数据帧的示例,但我正在寻找一个包含 n 个数据帧的解决方案
data_1
+------+-----------+
|person|first_value|
+------+-----------+
| 1| 1.0|
| 2| 0.9|
| 3| 0.8|
| 4| 0.7|
+------+-----------+
data_2
+------+------------+
|person|second_value|
+------+------------+
| 1| 0.5|
| 2| 0.6|
| 4| 0.7|
+------+------------+
data_3
+------+-----------+
|person|third_value|
+------+-----------+
| 1| 0.2|
| 3| 0.9|
| 4| 0.6|
+------+-----------+
现在我想计算两个或更多数据帧的加权平均值 - 为此我首先合并数据帧
+------+-----------+------------+-----------+
|person|first_value|second_value|third_value|
+------+-----------+------------+-----------+
| 1| 1.0| 0.5| 0.2|
| 2| 0.9| 0.6| null|
| 3| 0.8| null| 0.9|
| 4| 0.8| 0.7| 0.6|
+------+-----------+------------+-----------+
合并值的公式为:
val = val1 * weight1 + val2 * weight2 + val3 * weight3
但是,如果其中一个值为空,则另一个值应该能够加起来为 1,因此如果 val2 为空,则权重 2 应该分配给所有其他权重。我只是找不到一种优雅的方式来做到这一点。
w1 = 0.3, w2 = 0.4, w3 = 0.3 我目前得到的结果是我的公式:
+------+----+
|person| val|
+------+----+
| 3|null|
| 1|0.56|
| 4| 0.7|
| 2|null|
+------+----+
不过我想要这个:
+------+-----+
|person| val|
+------+-----+
| 1| 0.56|
| 2|0.729| <- val1*weight1_adj2 + val2*weight2_adj2
| 3| 0.85| <- val1*weight1_adj3 + val3*weight3_adj3
| 4| 0.7|
+------+-----+
调整后的权重
weight1_adj2 = w1/(w1+w2) = 0.57
weight2_adj2 = w2/(w1+w2) = 0.43
weight1_adj3 = w1/(w1+w3) = 0.5
weight3_adj3 = w3/(w1/w3) = 0.5
有什么方法可以在 pyspark 甚至 sql 中解决这个问题,还是我必须进入 udf?
这是我当前不处理空值的代码:
data1 = [("1",1.0),
("2",0.9),
("3",0.8),
("4",0.8)
]
schema1 = ["person","first_value"]
first_df = spark.createDataFrame(data=data1, schema = schema1)
data2 = [("1",0.5),
("2",0.6),
("4",0.7)
]
schema2 = ["person","second_value"]
second_df = spark.createDataFrame(data=data2, schema = schema2)
data3 = [("1",0.2),
("3",0.9),
("4",0.6)
]
schema3 = ["person","third_value"]
third_df = spark.createDataFrame(data=data3, schema = schema3)
combined_df = first_df.join(
second_df, ['person'], how='full'
).join(
third_df, ['person'], how='full'
)
w1 = 0.3
w2 = 0.4
w3 = 0.3
combined_df.groupBy(['person']).agg(
F.sum(
col('first_value')*w1 + col('second_value')*w2 + col('third_value')*w3
).alias('val')).show()
Edit1:我不是在询问按行添加空值,如下所述:
我们的想法是对列不为空的每行的所有权重求和,然后将各个权重除以该总和。
为了在列数及其权重方面获得一些灵活性,我将权重存储在字典中,使用列名作为键:
weights = {"first_value": 0.3, "second_value": 0.4, "third_value": 0.3}
然后我可以遍历字典到
- 计算非空列的权重总和
- 然后计算
value of column
*weight
/sum of weights
的所有非列的总和
wf = "1 / ("
val = ""
for col in weights:
wf += f"if({col} is null,0 ,{weights[col]}) + "
val += f"if( {col} is null, 0, {col} * {weights[col]} * weight_factor) + "
wf += "0 )"
val += "0"
combined_df = combined_df.withColumn("weight_factor", F.expr(wf)) \
.withColumn("val", F.expr(val))
输出:
+------+-----------+------------+-----------+-----------------+------------------+
|person|first_value|second_value|third_value| weight_factor| val|
+------+-----------+------------+-----------+-----------------+------------------+
| 1| 1.0| 0.5| 0.2|1.000000000000000| 0.56|
| 2| 0.9| 0.6| null|1.428571428571429|0.7285714285714289|
| 3| 0.8| null| 0.9|1.666666666666667|0.8500000000000002|
| 4| 0.8| 0.7| 0.6|1.000000000000000| 0.7|
+------+-----------+------------+-----------+-----------------+------------------+
作为下一步,您可以继续聚合并对 val
求和。