在 PySpark 中使用 'window' 函数按天分组的问题
Problem using 'window' function to group by day in PySpark
我有一个数据集需要重新采样。为此,我需要按天对其进行分组,同时计算每个传感器的中值。我正在使用 window
函数,但它只返回一个样本。
这是数据集:
+--------+-------------+-------------------+------+------------------+
|Variable| Sensor Name| Timestamp| Units| Value|
+--------+-------------+-------------------+------+------------------+
| NO2|aq_monitor914|2018-10-07 23:15:00|ugm -3|0.9945200000000001|
| NO2|aq_monitor914|2018-10-07 23:30:00|ugm -3|1.1449200000000002|
| NO2|aq_monitor914|2018-10-07 23:45:00|ugm -3| 1.13176|
| NO2|aq_monitor914|2018-10-08 00:00:00|ugm -3| 0.9212|
| NO2|aq_monitor914|2018-10-08 00:15:00|ugm -3| 1.39872|
| NO2|aq_monitor914|2018-10-08 00:30:00|ugm -3| 1.51528|
| NO2|aq_monitor914|2018-10-08 00:45:00|ugm -3| 1.61116|
| NO2|aq_monitor914|2018-10-08 01:00:00|ugm -3| 1.59612|
| NO2|aq_monitor914|2018-10-08 01:15:00|ugm -3| 1.12612|
| NO2|aq_monitor914|2018-10-08 01:30:00|ugm -3| 1.04528|
+--------+-------------+-------------------+------+------------------+
我需要按天重新采样,计算每一天 "Value" 列的中位数。我正在使用以下代码来执行此操作:
magic_percentile = psf.expr('percentile_approx(Value, 0.5)') #Calculates median of the 'Value' column
data = data.groupby('Variable','Sensor Name',window('Timestamp', "1 day")).agg(magic_percentile.alias('Value')
但是,问题来了,这只返回了以下 DataFrame:
+--------+-------------+--------------------+-------+
|Variable| Sensor Name| window| Value|
+--------+-------------+--------------------+-------+
| NO2|aq_monitor914|[2018-10-07 21:00...|1.13176|
+--------+-------------+--------------------+-------+
详述 'window' 列:
window=Row(start=datetime.datetime(2018, 10, 7, 21, 0), end=datetime.datetime(2018, 10, 8, 21, 0))
以我对window
的理解,应该把当前时间戳做成一天window,例如:
2018-10-07 23:15:00
应该变成:
2018-10-07
并按变量、传感器名称和当天对传感器进行分组,然后计算它的中位数。我真的很困惑该怎么做。
我相信你不需要使用Window
来实现你想要的。例如,如果你想对每个给定日期之前的天数进行一些汇总,你将需要它。在您的示例中,您只需解析 datetime
列并在 groupBy
语句中使用它就足够了。下面给出了一个工作示例,希望对您有所帮助!
import pyspark.sql.functions as psf
df = sqlContext.createDataFrame(
[
('NO2','aq_monitor914','2018-10-07 23:15:00',0.9945200000000001),
('NO2','aq_monitor914','2018-10-07 23:30:00',1.1449200000000002),
('NO2','aq_monitor914','2018-10-07 23:45:00',1.13176),
('NO2','aq_monitor914','2018-10-08 00:00:00',0.9212),
('NO2','aq_monitor914','2018-10-08 00:15:00',1.39872),
('NO2','aq_monitor914','2018-10-08 00:30:00',1.51528)
],
("Variable","Sensor Name","Timestamp","Value")
)
df = df.withColumn('Timestamp',psf.to_timestamp("Timestamp", "yyyy-MM-dd HH:mm:ss"))
df.show()
magic_percentile = psf.expr('percentile_approx(Value, 0.5)')
df_agg = df.groupBy('Variable','Sensor Name',psf.to_date('Timestamp').alias('Day')).agg(magic_percentile.alias('Value'))
df_agg.show()
输入:
+--------+-------------+-------------------+------------------+
|Variable| Sensor Name| Timestamp| Value|
+--------+-------------+-------------------+------------------+
| NO2|aq_monitor914|2018-10-07 23:15:00|0.9945200000000001|
| NO2|aq_monitor914|2018-10-07 23:30:00|1.1449200000000002|
| NO2|aq_monitor914|2018-10-07 23:45:00| 1.13176|
| NO2|aq_monitor914|2018-10-08 00:00:00| 0.9212|
| NO2|aq_monitor914|2018-10-08 00:15:00| 1.39872|
| NO2|aq_monitor914|2018-10-08 00:30:00| 1.51528|
+--------+-------------+-------------------+------------------+
输出:
+--------+-------------+----------+-------+
|Variable| Sensor Name| Day| Value|
+--------+-------------+----------+-------+
| NO2|aq_monitor914|2018-10-07|1.13176|
| NO2|aq_monitor914|2018-10-08|1.39872|
+--------+-------------+----------+-------+
我有一个数据集需要重新采样。为此,我需要按天对其进行分组,同时计算每个传感器的中值。我正在使用 window
函数,但它只返回一个样本。
这是数据集:
+--------+-------------+-------------------+------+------------------+
|Variable| Sensor Name| Timestamp| Units| Value|
+--------+-------------+-------------------+------+------------------+
| NO2|aq_monitor914|2018-10-07 23:15:00|ugm -3|0.9945200000000001|
| NO2|aq_monitor914|2018-10-07 23:30:00|ugm -3|1.1449200000000002|
| NO2|aq_monitor914|2018-10-07 23:45:00|ugm -3| 1.13176|
| NO2|aq_monitor914|2018-10-08 00:00:00|ugm -3| 0.9212|
| NO2|aq_monitor914|2018-10-08 00:15:00|ugm -3| 1.39872|
| NO2|aq_monitor914|2018-10-08 00:30:00|ugm -3| 1.51528|
| NO2|aq_monitor914|2018-10-08 00:45:00|ugm -3| 1.61116|
| NO2|aq_monitor914|2018-10-08 01:00:00|ugm -3| 1.59612|
| NO2|aq_monitor914|2018-10-08 01:15:00|ugm -3| 1.12612|
| NO2|aq_monitor914|2018-10-08 01:30:00|ugm -3| 1.04528|
+--------+-------------+-------------------+------+------------------+
我需要按天重新采样,计算每一天 "Value" 列的中位数。我正在使用以下代码来执行此操作:
magic_percentile = psf.expr('percentile_approx(Value, 0.5)') #Calculates median of the 'Value' column
data = data.groupby('Variable','Sensor Name',window('Timestamp', "1 day")).agg(magic_percentile.alias('Value')
但是,问题来了,这只返回了以下 DataFrame:
+--------+-------------+--------------------+-------+
|Variable| Sensor Name| window| Value|
+--------+-------------+--------------------+-------+
| NO2|aq_monitor914|[2018-10-07 21:00...|1.13176|
+--------+-------------+--------------------+-------+
详述 'window' 列:
window=Row(start=datetime.datetime(2018, 10, 7, 21, 0), end=datetime.datetime(2018, 10, 8, 21, 0))
以我对window
的理解,应该把当前时间戳做成一天window,例如:
2018-10-07 23:15:00
应该变成:
2018-10-07
并按变量、传感器名称和当天对传感器进行分组,然后计算它的中位数。我真的很困惑该怎么做。
我相信你不需要使用Window
来实现你想要的。例如,如果你想对每个给定日期之前的天数进行一些汇总,你将需要它。在您的示例中,您只需解析 datetime
列并在 groupBy
语句中使用它就足够了。下面给出了一个工作示例,希望对您有所帮助!
import pyspark.sql.functions as psf
df = sqlContext.createDataFrame(
[
('NO2','aq_monitor914','2018-10-07 23:15:00',0.9945200000000001),
('NO2','aq_monitor914','2018-10-07 23:30:00',1.1449200000000002),
('NO2','aq_monitor914','2018-10-07 23:45:00',1.13176),
('NO2','aq_monitor914','2018-10-08 00:00:00',0.9212),
('NO2','aq_monitor914','2018-10-08 00:15:00',1.39872),
('NO2','aq_monitor914','2018-10-08 00:30:00',1.51528)
],
("Variable","Sensor Name","Timestamp","Value")
)
df = df.withColumn('Timestamp',psf.to_timestamp("Timestamp", "yyyy-MM-dd HH:mm:ss"))
df.show()
magic_percentile = psf.expr('percentile_approx(Value, 0.5)')
df_agg = df.groupBy('Variable','Sensor Name',psf.to_date('Timestamp').alias('Day')).agg(magic_percentile.alias('Value'))
df_agg.show()
输入:
+--------+-------------+-------------------+------------------+
|Variable| Sensor Name| Timestamp| Value|
+--------+-------------+-------------------+------------------+
| NO2|aq_monitor914|2018-10-07 23:15:00|0.9945200000000001|
| NO2|aq_monitor914|2018-10-07 23:30:00|1.1449200000000002|
| NO2|aq_monitor914|2018-10-07 23:45:00| 1.13176|
| NO2|aq_monitor914|2018-10-08 00:00:00| 0.9212|
| NO2|aq_monitor914|2018-10-08 00:15:00| 1.39872|
| NO2|aq_monitor914|2018-10-08 00:30:00| 1.51528|
+--------+-------------+-------------------+------------------+
输出:
+--------+-------------+----------+-------+
|Variable| Sensor Name| Day| Value|
+--------+-------------+----------+-------+
| NO2|aq_monitor914|2018-10-07|1.13176|
| NO2|aq_monitor914|2018-10-08|1.39872|
+--------+-------------+----------+-------+