Python / Pyspark - 如何用平均值替换一些单元格?
Python / Pyspark - How to replace some cells with averages?
我有一个很大的问题,我希望有人能帮助我。
我想用另一个值替换列中的单元格。
数据框看起来像:
----------------------------------------
|Timestamp | Item_ID | Price |
----------------------------------------
|2017-05-01 11:05:00 | 12345 | 70 |
|2017-05-01 17:20:00 | 98765 | 10 |
|2017-05-01 11:50:00 | 12345 | 20 |
|2017-05-01 19:50:00 | 12345 | 0 |
|2017-05-01 20:17:00 | 12345 | 0 |
|2017-05-01 22:01:00 | 98765 | 0 |
----------------------------------------
如您所见,随着时间的推移,相同商品的价格会有所不同。
例如商品“12345”有三个价格:70,20 和 0
现在我想用其他价格的平均值替换所有“0”。
这样的事情可能吗?
结果应该是:
对于项目 12345:(70+20)/2= 45
对于商品 98765:只有一个价格,就拿这个吧。
----------------------------------------
|Timestamp | Item_ID | Price |
----------------------------------------
|2017-05-01 11:05:00 | 12345 | 70 |
|2017-05-01 17:20:00 | 98765 | 10 |
|2017-05-01 11:50:00 | 12345 | 20 |
|2017-05-01 19:50:00 | 12345 | 45 |
|2017-05-01 20:17:00 | 12345 | 45 |
|2017-05-01 22:01:00 | 98765 | 10 |
----------------------------------------
非常感谢您,祝您有愉快的一天!
qwertz
这里有一个方法可以使用 sparkSQL
:
from StringIO import StringIO
import pandas as pd
# create dummy data
df = pd.DataFrame.from_csv(StringIO("""Timestamp|Item_ID|Price
2017-05-01 11:05:00|12345|70
2017-05-01 17:20:00|98765|10
2017-05-01 11:50:00|12345|20
2017-05-01 19:50:00|12345|0
2017-05-01 20:17:00|12345|0
2017-05-01 22:01:00|98765|0""".replace("\s+", '')), sep="|").reset_index()
df['Timestamp'] = df['Timestamp'].astype(str)
spark_df = sqlCtx.createDataFrame(df)
spark_df.registerTempTable('table')
sqlCtx.sql("""SELECT Timestamp,
l.Item_ID,
CASE WHEN l.Price > 0 THEN l.Price ELSE r.Price END AS Price
FROM table l
LEFT JOIN (
SELECT Item_ID,
AVG(Price) AS Price
FROM table
WHERE Price > 0
GROUP BY Item_ID
) r ON l.Item_ID = r.Item_ID""".replace("\n", ' ')
).show()
输出:
+-------------------+-------+-----+
|Timestamp |Item_ID|Price|
+-------------------+-------+-----+
|2017-05-01 19:50:00|12345 |45.0 |
|2017-05-01 20:17:00|12345 |45.0 |
|2017-05-01 11:05:00|12345 |70.0 |
|2017-05-01 11:50:00|12345 |20.0 |
|2017-05-01 17:20:00|98765 |10.0 |
|2017-05-01 22:01:00|98765 |10.0 |
+-------------------+-------+-----+
解释:
通过调用 spark_df.registerTempTable('table')
,我将 spark DataFrame
注册为 SQLContext
(我将其命名为 table
)中的临时 table。我是 运行 的查询是使用 Item_ID
将 table
连接到自身,但一侧将具有聚合(平均)值。然后我使用 CASE
语句 select 给定值,或者如果 Price
是 0
.
的聚合值
我调用了 .replace("\n", " ")
因为不支持换行符(我相信它们被视为 EOF
)。这是一种编写可读查询的简单方法,无需将其全部放在一行中。
备注
您描述的技术是均值插补。由于这在该领域很常见,我不得不相信还有另一种(可能更好)的方法可以仅使用 spark DataFrame
函数(避免 SQL
)来做到这一点。
我有一个很大的问题,我希望有人能帮助我。 我想用另一个值替换列中的单元格。
数据框看起来像:
----------------------------------------
|Timestamp | Item_ID | Price |
----------------------------------------
|2017-05-01 11:05:00 | 12345 | 70 |
|2017-05-01 17:20:00 | 98765 | 10 |
|2017-05-01 11:50:00 | 12345 | 20 |
|2017-05-01 19:50:00 | 12345 | 0 |
|2017-05-01 20:17:00 | 12345 | 0 |
|2017-05-01 22:01:00 | 98765 | 0 |
----------------------------------------
如您所见,随着时间的推移,相同商品的价格会有所不同。 例如商品“12345”有三个价格:70,20 和 0 现在我想用其他价格的平均值替换所有“0”。 这样的事情可能吗?
结果应该是: 对于项目 12345:(70+20)/2= 45 对于商品 98765:只有一个价格,就拿这个吧。
----------------------------------------
|Timestamp | Item_ID | Price |
----------------------------------------
|2017-05-01 11:05:00 | 12345 | 70 |
|2017-05-01 17:20:00 | 98765 | 10 |
|2017-05-01 11:50:00 | 12345 | 20 |
|2017-05-01 19:50:00 | 12345 | 45 |
|2017-05-01 20:17:00 | 12345 | 45 |
|2017-05-01 22:01:00 | 98765 | 10 |
----------------------------------------
非常感谢您,祝您有愉快的一天! qwertz
这里有一个方法可以使用 sparkSQL
:
from StringIO import StringIO
import pandas as pd
# create dummy data
df = pd.DataFrame.from_csv(StringIO("""Timestamp|Item_ID|Price
2017-05-01 11:05:00|12345|70
2017-05-01 17:20:00|98765|10
2017-05-01 11:50:00|12345|20
2017-05-01 19:50:00|12345|0
2017-05-01 20:17:00|12345|0
2017-05-01 22:01:00|98765|0""".replace("\s+", '')), sep="|").reset_index()
df['Timestamp'] = df['Timestamp'].astype(str)
spark_df = sqlCtx.createDataFrame(df)
spark_df.registerTempTable('table')
sqlCtx.sql("""SELECT Timestamp,
l.Item_ID,
CASE WHEN l.Price > 0 THEN l.Price ELSE r.Price END AS Price
FROM table l
LEFT JOIN (
SELECT Item_ID,
AVG(Price) AS Price
FROM table
WHERE Price > 0
GROUP BY Item_ID
) r ON l.Item_ID = r.Item_ID""".replace("\n", ' ')
).show()
输出:
+-------------------+-------+-----+
|Timestamp |Item_ID|Price|
+-------------------+-------+-----+
|2017-05-01 19:50:00|12345 |45.0 |
|2017-05-01 20:17:00|12345 |45.0 |
|2017-05-01 11:05:00|12345 |70.0 |
|2017-05-01 11:50:00|12345 |20.0 |
|2017-05-01 17:20:00|98765 |10.0 |
|2017-05-01 22:01:00|98765 |10.0 |
+-------------------+-------+-----+
解释:
通过调用 spark_df.registerTempTable('table')
,我将 spark DataFrame
注册为 SQLContext
(我将其命名为 table
)中的临时 table。我是 运行 的查询是使用 Item_ID
将 table
连接到自身,但一侧将具有聚合(平均)值。然后我使用 CASE
语句 select 给定值,或者如果 Price
是 0
.
我调用了 .replace("\n", " ")
因为不支持换行符(我相信它们被视为 EOF
)。这是一种编写可读查询的简单方法,无需将其全部放在一行中。
备注
您描述的技术是均值插补。由于这在该领域很常见,我不得不相信还有另一种(可能更好)的方法可以仅使用 spark DataFrame
函数(避免 SQL
)来做到这一点。