R 使用 doParallel 按因子级别从多个文件中拆分大量数据集的有效方法
R Efficient way to split large amount of datasets from multiple files by level of factor using doParallel
我有大量文件需要读入 R,将它们放入数据框中,然后按特定列(“pracid”)拆分。计算将在集群上执行。我的代码处理的文件数量较少,但是使用所有文件时的数据量对于 R 来说太大了。
期望的输出:包含因子“pracid”每个水平的数据集的单独文件。例如。 file1 包含 pracid==1 的所有数据,file2 包含 pracid==None 的所有数据,依此类推
data.table::rbindlist(dat) 中的错误:列表中的总行数为 3479242206,大于最大行数,当前为 21474833647。
我还必须在另一个包含更多文件的目录中工作。处理如此大量的数据最有效的方法是什么?
library(plyr)
library(tidyverse)
library(broom)
library(dplyr)
library(sqldf)
library(data.table)
library(stringr)
library(lubridate)
library(doParallel)
procs = as.numeric(Sys.getenv("MOAB_PROCCOUNT"))
registerDoParallel(cores=procs)
files = list.files(path = paste0(data_dir, "Consultation"), pattern = "*.txt$", full.names = T)
dat = foreach(i = files) %dopar% read.delim(i)
dat = as.data.frame(data.table::rbindlist(dat))
dat = dat %>% distinct(consid, .keep_all = TRUE) %>% arrange(pracid)
write.table(dat, file = paste0(output_dir, "consultation.txt"), sep = "\t")
# Split by practice
dat.split = as.data.table(dat)
dat.split = split(dat.split, by = c("pracid"))
for (i in 1:length(dat.split)){
write.table(dat.split[i], file=paste0(practice_dir, "Consultation/",
names(dat.split)[i], ".txt"), sep = "\t",
row.names=FALSE, col.names = colnames(dat.split[[1]]))
}
文件格式如下:
patid consid pracid staffid
50000082035 23408234 2002 12003
235235 234234 45666 209
这在 bash 和 awk 中比 R:
更容易和更快
#!/bin/bash
cd /your/data/dir
# it's a better practice to make a separate outdir:
mkdir Consultation_bypracid
find Consultation -name "*.txt" -print0 | xargs -0 awk -F$'\t' '
BEGIN{RS="\r\n|\n|\r";ORS="\n"}
{
# get column index of pracid column
if(ci==""){
header=[=10=]
for(ci=1;ci<=NF;ci++){
if($ci=="pracid") break
}
}
if(FNR>1){ # skip input file header row
pracid = $ci
if(outpath[pracid]==""){
outpath[pracid] = "Consultation_bypracid/"pracid".txt"
print header >> outpath[pracid]
}
print >> outpath[pracid]
}
}'
在我的系统上,一个非常大的 Google 虚拟机和 non-SSD 磁盘,每分钟处理大约 1 GB 的输入数据。此任务使用大量磁盘 I/O 而很少 cpu,因此并行化无济于事,您的磁盘速度将决定其运行速度。
输出文件到 Consultation_bypracid 命名为 [first pracid].txt...[last pracid].txt
我有大量文件需要读入 R,将它们放入数据框中,然后按特定列(“pracid”)拆分。计算将在集群上执行。我的代码处理的文件数量较少,但是使用所有文件时的数据量对于 R 来说太大了。
期望的输出:包含因子“pracid”每个水平的数据集的单独文件。例如。 file1 包含 pracid==1 的所有数据,file2 包含 pracid==None 的所有数据,依此类推
data.table::rbindlist(dat) 中的错误:列表中的总行数为 3479242206,大于最大行数,当前为 21474833647。
我还必须在另一个包含更多文件的目录中工作。处理如此大量的数据最有效的方法是什么?
library(plyr)
library(tidyverse)
library(broom)
library(dplyr)
library(sqldf)
library(data.table)
library(stringr)
library(lubridate)
library(doParallel)
procs = as.numeric(Sys.getenv("MOAB_PROCCOUNT"))
registerDoParallel(cores=procs)
files = list.files(path = paste0(data_dir, "Consultation"), pattern = "*.txt$", full.names = T)
dat = foreach(i = files) %dopar% read.delim(i)
dat = as.data.frame(data.table::rbindlist(dat))
dat = dat %>% distinct(consid, .keep_all = TRUE) %>% arrange(pracid)
write.table(dat, file = paste0(output_dir, "consultation.txt"), sep = "\t")
# Split by practice
dat.split = as.data.table(dat)
dat.split = split(dat.split, by = c("pracid"))
for (i in 1:length(dat.split)){
write.table(dat.split[i], file=paste0(practice_dir, "Consultation/",
names(dat.split)[i], ".txt"), sep = "\t",
row.names=FALSE, col.names = colnames(dat.split[[1]]))
}
文件格式如下:
patid consid pracid staffid
50000082035 23408234 2002 12003
235235 234234 45666 209
这在 bash 和 awk 中比 R:
更容易和更快#!/bin/bash
cd /your/data/dir
# it's a better practice to make a separate outdir:
mkdir Consultation_bypracid
find Consultation -name "*.txt" -print0 | xargs -0 awk -F$'\t' '
BEGIN{RS="\r\n|\n|\r";ORS="\n"}
{
# get column index of pracid column
if(ci==""){
header=[=10=]
for(ci=1;ci<=NF;ci++){
if($ci=="pracid") break
}
}
if(FNR>1){ # skip input file header row
pracid = $ci
if(outpath[pracid]==""){
outpath[pracid] = "Consultation_bypracid/"pracid".txt"
print header >> outpath[pracid]
}
print >> outpath[pracid]
}
}'
在我的系统上,一个非常大的 Google 虚拟机和 non-SSD 磁盘,每分钟处理大约 1 GB 的输入数据。此任务使用大量磁盘 I/O 而很少 cpu,因此并行化无济于事,您的磁盘速度将决定其运行速度。
输出文件到 Consultation_bypracid 命名为 [first pracid].txt...[last pracid].txt