如何对时间序列大数据进行重采样(Downsample),从10Hz(毫秒)想转换成1Hz(秒)使用pyspark
How to resample (Downsample) the time series big data, from 10 Hz (miliseconds) wants to convert to 1 Hz (seconds) using pyspark
我正在使用 pyspark 处理时间序列大数据,我的数据以 GB(100 GB 或更多)为单位,行数以百万或数十亿为单位。我是使用 pyspark 的大数据新手。想要重新采样(下采样)数据原始数据以毫秒为单位的时间戳为 10 Hz 我想以秒为单位将此数据转换为 1 Hz。如果你能给我一些想法,那将非常有帮助。如果你能向我推荐任何我可以用来使用 spark 处理(大数据)大数据的 documentation/solution 也很好。以下是示例数据。 DF=
start_timestamp
end_timestamp
value
2020-11-05 03:25:02.088
2020-11-05 04:10:19.288
0.0
2020-11-05 04:24:25.288
2020-11-05 04:24:25.218
0.4375
2020-11-05 04:24:25.218
2020-11-05 04:24:25.318
1.0625
2020-11-05 04:24:25.318
2020-11-05 04:24:25.418
1.21875
2020-11-05 04:24:25.418
2020-11-05 04:24:25.518
1.234375
2020-11-05 04:24:25.518
2020-11-05 04:24:25.618
1.265625
2020-11-05 04:24:25.618
2020-11-05 04:24:25.718
1.28125
我尝试了我获得的代码:
这是我的示例代码:
day = 1 #60 * 60 * 24
epoch = (col("start_timestamp").cast("bigint") / day).cast("bigint") * day
with_epoch = distinctDF.withColumn("epoch", epoch)
min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()
ref = spark.range(
min_epoch, max_epoch + 1, day
).toDF("epoch")
(ref
.join(with_epoch, "epoch", "left")
.orderBy("epoch")
.withColumn("start_timestamp_resampled", timestamp_seconds("epoch"))
.show(15, False))
代码正在运行,但我不确定它是否正确:输出如下所示。但是它是否在列中显示空值。
epoch
start_timestamp
end_timestamp
value
start_timestamp_resampled
1604546702
2020-11-05 03:25:02.088
2020-11-05 04:10:19.288
0.0
2020-11-05 03:25:02
1604546703
null
null
null
2020-11-05 03:25:03
1604546704
null
null
null
2020-11-05 03:25:04
1604546705
null
null
null
2020-11-05 03:25:05
1604546706
null
null
null
2020-11-05 03:25:06
1604546707
null
null
null
2020-11-05 03:25:07
缩减采样时,您必须考虑如何处理丢失的数据。
使用联接,您将仅在时间戳匹配时获取数据。但您也可以决定使用以下方法聚合数据点:平均值、最大值、最小值、总和...
我的做法:
import pyspark.sql.functions as F
df = df.withColumn("Timestamp_resampled", F.date_trunc(timestamp, format='yyyy-MM-dd HH:mm:ss'))
df = df.groupby("Timestamp_resampled").agg(<function of your choice>)
然后一旦重新采样,如果您缺少时间戳,您可以使用带有 join
和 epoch_range
的方法来填充缺少的时间戳,并确保每一秒都有一个时间戳。
我正在使用 pyspark 处理时间序列大数据,我的数据以 GB(100 GB 或更多)为单位,行数以百万或数十亿为单位。我是使用 pyspark 的大数据新手。想要重新采样(下采样)数据原始数据以毫秒为单位的时间戳为 10 Hz 我想以秒为单位将此数据转换为 1 Hz。如果你能给我一些想法,那将非常有帮助。如果你能向我推荐任何我可以用来使用 spark 处理(大数据)大数据的 documentation/solution 也很好。以下是示例数据。 DF=
start_timestamp | end_timestamp | value |
---|---|---|
2020-11-05 03:25:02.088 | 2020-11-05 04:10:19.288 | 0.0 |
2020-11-05 04:24:25.288 | 2020-11-05 04:24:25.218 | 0.4375 |
2020-11-05 04:24:25.218 | 2020-11-05 04:24:25.318 | 1.0625 |
2020-11-05 04:24:25.318 | 2020-11-05 04:24:25.418 | 1.21875 |
2020-11-05 04:24:25.418 | 2020-11-05 04:24:25.518 | 1.234375 |
2020-11-05 04:24:25.518 | 2020-11-05 04:24:25.618 | 1.265625 |
2020-11-05 04:24:25.618 | 2020-11-05 04:24:25.718 | 1.28125 |
我尝试了我获得的代码:
这是我的示例代码:
day = 1 #60 * 60 * 24
epoch = (col("start_timestamp").cast("bigint") / day).cast("bigint") * day
with_epoch = distinctDF.withColumn("epoch", epoch)
min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()
ref = spark.range(
min_epoch, max_epoch + 1, day
).toDF("epoch")
(ref
.join(with_epoch, "epoch", "left")
.orderBy("epoch")
.withColumn("start_timestamp_resampled", timestamp_seconds("epoch"))
.show(15, False))
代码正在运行,但我不确定它是否正确:输出如下所示。但是它是否在列中显示空值。
epoch | start_timestamp | end_timestamp | value | start_timestamp_resampled |
---|---|---|---|---|
1604546702 | 2020-11-05 03:25:02.088 | 2020-11-05 04:10:19.288 | 0.0 | 2020-11-05 03:25:02 |
1604546703 | null | null | null | 2020-11-05 03:25:03 |
1604546704 | null | null | null | 2020-11-05 03:25:04 |
1604546705 | null | null | null | 2020-11-05 03:25:05 |
1604546706 | null | null | null | 2020-11-05 03:25:06 |
1604546707 | null | null | null | 2020-11-05 03:25:07 |
缩减采样时,您必须考虑如何处理丢失的数据。
使用联接,您将仅在时间戳匹配时获取数据。但您也可以决定使用以下方法聚合数据点:平均值、最大值、最小值、总和...
我的做法:
import pyspark.sql.functions as F
df = df.withColumn("Timestamp_resampled", F.date_trunc(timestamp, format='yyyy-MM-dd HH:mm:ss'))
df = df.groupby("Timestamp_resampled").agg(<function of your choice>)
然后一旦重新采样,如果您缺少时间戳,您可以使用带有 join
和 epoch_range
的方法来填充缺少的时间戳,并确保每一秒都有一个时间戳。