优化 R 中的时间序列聚合

Optimize time series aggregation in R

我有很大的时间序列(日期时间、值、实例),在可视化之前,我需要使用每个实例的每个时间间隔(在我的示例中为 15 分钟)的最大值来聚合数据。

我没有在 R 中找到本机聚合函数,因此我使用 celling_data 和 cut 方法创建了 2 个自定义函数。看我的例子:

library(tidyverse)
library(lubridate)


agg_fun_1 <- function (data, aggregation_period = 900) {

agg_period <- paste(aggregation_period, "secs")

agg_data <- data %>%  
    group_by(across(-c(Value, datetime)),  
             datetime = as.POSIXct(cut(datetime, agg_period)) + aggregation_period) %>%
    summarise (Value = max(Value) , .groups = "drop") %>% 
    mutate(Value = ifelse(is.infinite(Value), NA, Value))

return (agg_data)

}


agg_fun_2 <- function (data, aggregation_period = "15 mins") {

    agg_data <- data %>% 
        group_by(across(-c(Value, datetime)), datetime = ceiling_date (datetime, aggregation_period))
    
    suppressWarnings(
        agg_data <- agg_data %>% 
            summarise(Value = max(Value,  na.rm = F), .groups = "drop") %>% 
            mutate(Value = ifelse(is.infinite(Value), NA, Value))
    )   
    
    return (agg_data)
    
}


set.seed(42)

example_data <- tibble()

for(i in 1:256) {
    
    example_data <- rbind(example_data,
        
        data.frame( Instance = rep(i,20002),
                     datetime = seq.POSIXt(as.POSIXct("2020-12-26 10:00:00"), as.POSIXct("2020-12-26 10:00:00") + 15*20001, "15 sec"),
                     Value = sample(0:1000, 20002, replace=TRUE)
                     )   
    )
    
}

gc()

start_time <- Sys.time()

agg_fun_1(example_data)

end_time <- Sys.time()
end_time - start_time

gc()

start_time <- Sys.time()

agg_fun_2(example_data)

end_time <- Sys.time()
end_time - start_time

在真实环境中,我会运行 8 个并行 R 脚本,我的数据可能比我使用的示例大 3-5 倍。那样的话,我可能会面临资源不足的问题。

是否有任何方法可以优化函数的 RAM 使用和执行时间,或者是否有更好的聚合函数?

大部分时间都花在计算您的总时间上(削减或ceiling_date)。尝试在没有其余代码的情况下计算它以查看它。

使用日期时间作为自原点以来的秒数的直接计算速度更快(特别是如果您的 agg_period 可以很容易地作为秒数给出)

这里使用 data.table 而不是 tidyverse(但应该对 tidy 做同样的事情)。

# data.frame on steroids
library(data.table)

# transform you data as data.table
example_data <- setDT(example_data)

agg_period <- 15*60 # seconds

agg_data <- example_data[, 
  # group by aggregated datetime
  by=as_datetime(floor(as.integer(datetime)/agg_period)*agg_period),
  # compute Value as max Value of group
  .(Value=max(Value))
]

# replace infinite Value with NA (:= means inplace)
agg_data[is.infinite(Value), Value:=NA]