如何对时间序列大数据进行重采样(Downsample),从10Hz(毫秒)想转换成1Hz(秒)使用pyspark

How to resample (Downsample) the time series big data, from 10 Hz (miliseconds) wants to convert to 1 Hz (seconds) using pyspark

我正在使用 pyspark 处理时间序列大数据,我的数据以 GB(100 GB 或更多)为单位,行数以百万或数十亿为单位。我是使用 pyspark 的大数据新手。想要重新采样(下采样)数据原始数据以毫秒为单位的时间戳为 10 Hz 我想以秒为单位将此数据转换为 1 Hz。如果你能给我一些想法,那将非常有帮助。如果你能向我推荐任何我可以用来使用 spark 处理(大数据)大数据的 documentation/solution 也很好。以下是示例数据。 DF=

start_timestamp end_timestamp value
2020-11-05 03:25:02.088 2020-11-05 04:10:19.288 0.0
2020-11-05 04:24:25.288 2020-11-05 04:24:25.218 0.4375
2020-11-05 04:24:25.218 2020-11-05 04:24:25.318 1.0625
2020-11-05 04:24:25.318 2020-11-05 04:24:25.418 1.21875
2020-11-05 04:24:25.418 2020-11-05 04:24:25.518 1.234375
2020-11-05 04:24:25.518 2020-11-05 04:24:25.618 1.265625
2020-11-05 04:24:25.618 2020-11-05 04:24:25.718 1.28125

我尝试了我获得的代码:

这是我的示例代码:

day = 1   #60 * 60 * 24
epoch = (col("start_timestamp").cast("bigint") / day).cast("bigint") * day

with_epoch = distinctDF.withColumn("epoch", epoch)

min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()


ref = spark.range(
    min_epoch, max_epoch + 1, day
).toDF("epoch")  
(ref
    .join(with_epoch, "epoch", "left")
    .orderBy("epoch")
    .withColumn("start_timestamp_resampled", timestamp_seconds("epoch"))
    .show(15, False))

代码正在运行,但我不确定它是否正确:输出如下所示。但是它是否在列中显示空值。

epoch start_timestamp end_timestamp value start_timestamp_resampled
1604546702 2020-11-05 03:25:02.088 2020-11-05 04:10:19.288 0.0 2020-11-05 03:25:02
1604546703 null null null 2020-11-05 03:25:03
1604546704 null null null 2020-11-05 03:25:04
1604546705 null null null 2020-11-05 03:25:05
1604546706 null null null 2020-11-05 03:25:06
1604546707 null null null 2020-11-05 03:25:07

缩减采样时,您必须考虑如何处理丢失的数据。

使用联接,您将仅在时间戳匹配时获取数据。但您也可以决定使用以下方法聚合数据点:平均值、最大值、最小值、总和...

我的做法:

import pyspark.sql.functions as F
df = df.withColumn("Timestamp_resampled", F.date_trunc(timestamp, format='yyyy-MM-dd HH:mm:ss'))
df = df.groupby("Timestamp_resampled").agg(<function of your choice>)

然后一旦重新采样,如果您缺少时间戳,您可以使用带有 joinepoch_range 的方法来填充缺少的时间戳,并确保每一秒都有一个时间戳。