并行化 for 循环分析 data.frame

Parallelization over for loop analyzing a data.frame

这些天我一直在处理一个 data.frame 的 8M 寄存器,我需要改进一个分析这些数据的循环。

我将描述我试图解决的问题的每个过程。 首先,我必须按 ClientID、Date 和 Time 三个字段按升序排列所有 data.frame。 (查看) 然后,使用排列的 data.frame,我必须对每个观察值之间的差异进行操作,只有在 ClientID 相同的情况下才能这样做。例如:

ClientID|Date(YMD)|Time(HMS)
A|20120101|110000
A|20120101|111500
A|20120101|120000
B|20120202|010000
B|20120202|012030

根据上面的资料,我要得到的结果是:

ClientID|Date(YMD)|Time(HMS)|Difference(minutes)
A|20120101|110000|0.00
A|20120101|111500|15.00
A|20120101|120000|45.00
B|20120202|010000|0
B|20120202|012030|20.30

现在的问题是,用 data.frame 的 800 万次观察分析所有这些,大约需要 3 天时间。我希望我可以并行化这个过程。我的想法是 data.frame 可以按簇进行分割,但是这种分割可以是有序的而不是随机的,然后使用库 foreach 或其他库,可以按簇进行分析并将其设置为核心可用。例如:

Cluster|ClientID|Date(YMD)|Time(HMS)
CORE 1|
1|A|20120101|110000
1|A|20120101|111500
1|A|20120101|120000
CORE 2|
2|B|20120202|010000
2|B|20120202|012030

我不建议尝试将其并行化。使用 data.table 包并处理以整数格式存储的时间,这应该花费很少的时间。

生成一些示例数据

library(data.table)

## Generate Data
RowCount <- 8e6
GroupCount <-1e4

DT <- data.table(ClientID = paste0("Client ",sample.int(GroupCount,size = RowCount, replace = TRUE)),
                 Time = sample.int(12,size = RowCount, replace = TRUE)*900)

DT[, Time := cumsum(Time), keyby = .(ClientID)]
DT[, Time := as.POSIXct(Time, tz = "UTC", origin = "1970-01-01 00:00:00")]

print(DT)

给予

            ClientID                Time
      1:    Client 1 1970-01-01 02:30:00
      2:    Client 1 1970-01-01 04:00:00
      3:    Client 1 1970-01-01 05:30:00
      4:    Client 1 1970-01-01 07:00:00
      5:    Client 1 1970-01-01 10:00:00
     ---                                
7999996: Client 9999 1970-02-20 18:15:00
7999997: Client 9999 1970-02-20 18:30:00
7999998: Client 9999 1970-02-20 21:00:00
7999999: Client 9999 1970-02-20 22:45:00
8000000: Client 9999 1970-02-21 00:30:00

计算时间差

system.time({
  ## Create a integer column that stores time as the number of seconds midnight on 1970
  DT[,Time_Unix := as.integer(Time)]

  ## Order by ClientID then Time_Unix
  setkey(DT, ClientID, Time_Unix)

  ## Calculate Elapsed Time in minutes between rows, grouped by ClientID
  DT[, Elapsed_Minutes := (Time_Unix - shift(Time_Unix, n = 1L, type = "lag", fill = NA))/60L, keyby = .(ClientID)]

  ## Clean up the integer time
  DT[,Time_Unix := NULL]
})

...

   user  system elapsed 
  0.416   0.025   0.442 

结果:

print(DT)

...

            ClientID                Time Elapsed_Minutes
      1:    Client 1 1970-01-01 02:30:00              NA
      2:    Client 1 1970-01-01 04:00:00              90
      3:    Client 1 1970-01-01 05:30:00              90
      4:    Client 1 1970-01-01 07:00:00              90
      5:    Client 1 1970-01-01 10:00:00             180
     ---                                                
7999996: Client 9999 1970-02-20 18:15:00             135
7999997: Client 9999 1970-02-20 18:30:00              15
7999998: Client 9999 1970-02-20 21:00:00             150
7999999: Client 9999 1970-02-20 22:45:00             105
8000000: Client 9999 1970-02-21 00:30:00             105