什么是读取 CSV 文件的内存有效方式?
What is a memory efficient way to read a CSV file?
我的程序使用 csv
板条箱将 CSV 文件读取到 Vec<Vec<String>>
,其中外部向量代表行,内部向量将行分成列。
use std::{time, thread::{sleep, park}};
use csv;
fn main() {
different_scope();
println!("Parked");
park();
}
fn different_scope() {
println!("Reading csv");
let _data = read_csv("data.csv");
println!("Sleeping");
sleep(time::Duration::from_secs(4));
println!("Going out of scope");
}
fn read_csv(path: &str) -> Vec<Vec<String>> {
let mut rdr = csv::Reader::from_path(path).unwrap();
return rdr
.records()
.map(|row| {
row
.unwrap()
.iter()
.map(|column| column.to_string())
.collect()
})
.collect();
}
我正在查看 htop
的 RAM 使用情况,它使用 2.5GB 的内存来读取 250MB 的 CSV 文件。
这是cat /proc/<my pid>/status
的内容
Name: (name)
Umask: 0002
State: S (sleeping)
Tgid: 18349
Ngid: 0
Pid: 18349
PPid: 18311
TracerPid: 0
Uid: 1000 1000 1000 1000
Gid: 1000 1000 1000 1000
FDSize: 256
Groups: 4 24 27 30 46 118 128 133 1000
NStgid: 18349
NSpid: 18349
NSpgid: 18349
NSsid: 18311
VmPeak: 2748152 kB
VmSize: 2354932 kB
VmLck: 0 kB
VmPin: 0 kB
VmHWM: 2580156 kB
VmRSS: 2345944 kB
RssAnon: 2343900 kB
RssFile: 2044 kB
RssShmem: 0 kB
VmData: 2343884 kB
VmStk: 136 kB
VmExe: 304 kB
VmLib: 2332 kB
VmPTE: 4648 kB
VmSwap: 0 kB
HugetlbPages: 0 kB
CoreDumping: 0
THP_enabled: 1
Threads: 1
SigQ: 0/127783
SigPnd: 0000000000000000
ShdPnd: 0000000000000000
SigBlk: 0000000000000000
SigIgn: 0000000000001000
SigCgt: 0000000180000440
CapInh: 0000000000000000
CapPrm: 0000000000000000
CapEff: 0000000000000000
CapBnd: 0000003fffffffff
CapAmb: 0000000000000000
NoNewPrivs: 0
Seccomp: 0
Speculation_Store_Bypass: thread vulnerable
Cpus_allowed: ffffffff
Cpus_allowed_list: 0-31
Mems_allowed: 00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000001
Mems_allowed_list: 0
voluntary_ctxt_switches: 9
nonvoluntary_ctxt_switches: 293
当我删除变量时,它释放了正确的空间(大约 250MB),但还剩下 2.2GB。在我的所有内存被用完并且进程被终止之前,我无法读取超过 2-3GB 的数据(cargo
打印“Killed”)。
如何释放多余的内存而正在读取 CSV?
我需要处理每一行,但在这种情况下我不需要一次保存所有这些数据,但如果我保存了呢?
问了一个related question and I was pointed to ,对理解问题很有帮助,但是不知道怎么解决
我的理解是我应该将我的 crate 切换到不同的内存分配器,但是强制使用我能找到的所有分配器感觉像是一种无知的方法。
对于有关内存的问题,最好开发一种量化内存使用的技术。您可以通过检查您的 表示 来做到这一点。在本例中,即 Vec<Vec<String>>
。特别是,如果您有一个 250MB 的 CSV 文件,它表示为一系列字段,那么您不一定只使用 250MB 的内存。您需要考虑您的代表的 开销。
对于 Vec<Vec<String>>
,我们可以消除外部 Vec<...>
的开销,因为它将(在您的程序中)在堆栈上而不是堆上。它在堆上的内部 Vec<String>
上。
因此,如果您的 CSV 文件有 M
条记录并且每条记录有 N
字段,那么将有 M
个 Vec<String>
和 M * N
的实例String
的实例。 Vec<T>
和 String
的开销都是 3 * sizeof(word)
,一个字是指向数据的指针,另一个字是长度,另一个字是容量。 (对于 64 位目标,这是 24 个字节。)因此,对于 64 位目标,您的总开销是 (M * 24) + (M * N * 24)
.
让我们通过实验来测试一下。因为你没有分享你的 CSV 输入(你真的应该在未来),我会带来我自己的。它有 145MB,有 M=3,173,958
条记录,每条记录有 N=7
个字段。因此,表示的总开销为 (3173958 * 24) + (3173958 * 7 * 24) = 609,399,936
字节,即 609 MB。让我们用一个真实的程序来测试一下:
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_path = match std::env::args_os().nth(1) {
Some(p) => p,
None => {
eprintln!("Usage: csvmem <path>");
std::process::exit(1);
}
};
let rdr = csv::Reader::from_path(input_path)?;
let mut records: Vec<Vec<String>> = vec![];
for result in rdr.into_records() {
let mut record: Vec<String> = vec![];
for column in result?.iter() {
record.push(column.to_string());
}
records.push(record);
}
println!("{}", records.len());
Ok(())
}
(我在几个地方添加了一些不必要的类型注释以使代码更清晰一些,特别是关于我们的表示。)所以让我们 运行 这个程序(它唯一的依赖是 csv = "1"
在我的 Cargo.toml
):
$ echo $TIMEFMT
real %*E user %*U sys %*S maxmem %M MB faults %F
$ cargo b --release
$ time ./target/release/csvmem /m/sets/csv/pop/worldcitiespop-nice.csv
3173958
real 1.542
user 1.236
sys 0.296
maxmem 1287 MB
faults 0
此处的 time
实用程序报告 peak 内存使用情况,实际上比我们预期的要高一点:609 + 145 = 754MB
。我不太了解分配器,无法完全理解差异。可能是我正在使用的系统分配器分配了比实际需要的更大的块。让我们通过使用 Box<str>
而不是 String
来使我们的表示更有效。我们牺牲了扩展字符串的能力,但作为交换,我们为每个字段节省了 8 个字节的开销。因此,我们的新开销计算结果为 (3173958 * 24) + (3173958 * 7 * 16) = 431,658,288
字节或 431MB,相差 609 - 431 = 178MB
。那么让我们测试一下我们的新表示,看看我们的增量是多少:
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_path = match std::env::args_os().nth(1) {
Some(p) => p,
None => {
eprintln!("Usage: csvmem <path>");
std::process::exit(1);
}
};
let rdr = csv::Reader::from_path(input_path)?;
let mut records: Vec<Vec<Box<str>>> = vec![];
for result in rdr.into_records() {
let mut record: Vec<Box<str>> = vec![];
for column in result?.iter() {
record.push(column.to_string().into());
}
records.push(record);
}
println!("{}", records.len());
Ok(())
}
并编译 运行:
$ cargo b --release
$ time ./target/release/csvmem /m/sets/csv/pop/worldcitiespop-nice.csv
3173958
real 1.459
user 1.183
sys 0.266
maxmem 1093 MB
faults 0
总增量为 194MB。这与我们的猜测非常接近。
我们可以使用 Vec<Box<[Box<str>]>>
:
进一步优化表示
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_path = match std::env::args_os().nth(1) {
Some(p) => p,
None => {
eprintln!("Usage: csvmem <path>");
std::process::exit(1);
}
};
let rdr = csv::Reader::from_path(input_path)?;
let mut records: Vec<Box<[Box<str>]>> = vec![];
for result in rdr.into_records() {
let mut record: Vec<Box<str>> = vec![];
for column in result?.iter() {
record.push(column.to_string().into());
}
records.push(record.into());
}
println!("{}", records.len());
Ok(())
}
这给出了 1069 MB 的峰值内存使用量。所以节省的钱不多。
但是,我们能做的最好的事情就是使用 csv::StringRecord
:
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_path = match std::env::args_os().nth(1) {
Some(p) => p,
None => {
eprintln!("Usage: csvmem <path>");
std::process::exit(1);
}
};
let rdr = csv::Reader::from_path(input_path)?;
let mut records = vec![];
for result in rdr.into_records() {
let record = result?;
records.push(record);
}
println!("{}", records.len());
Ok(())
}
这给出了 727MB 的峰值内存使用量。秘诀在于 StringRecord
以内联方式存储字段,而无需第二层间接寻址。最终节省了不少!
当然,如果您不需要一次将所有记录存储在内存中,那么您不应该这样做。 CSV crate 支持很好:
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_path = match std::env::args_os().nth(1) {
Some(p) => p,
None => {
eprintln!("Usage: csvmem <path>");
std::process::exit(1);
}
};
let mut count = 0;
let rdr = csv::Reader::from_path(input_path)?;
for result in rdr.into_records() {
let _ = result?;
count += 1;
}
println!("{}", count);
Ok(())
}
而且该程序的内存使用峰值仅为 9MB,正如您对流式实现所期望的那样。 (从技术上讲,如果您下拉并使用 csv-core
板条箱,则完全可以不使用堆内存。)
我的程序使用 csv
板条箱将 CSV 文件读取到 Vec<Vec<String>>
,其中外部向量代表行,内部向量将行分成列。
use std::{time, thread::{sleep, park}};
use csv;
fn main() {
different_scope();
println!("Parked");
park();
}
fn different_scope() {
println!("Reading csv");
let _data = read_csv("data.csv");
println!("Sleeping");
sleep(time::Duration::from_secs(4));
println!("Going out of scope");
}
fn read_csv(path: &str) -> Vec<Vec<String>> {
let mut rdr = csv::Reader::from_path(path).unwrap();
return rdr
.records()
.map(|row| {
row
.unwrap()
.iter()
.map(|column| column.to_string())
.collect()
})
.collect();
}
我正在查看 htop
的 RAM 使用情况,它使用 2.5GB 的内存来读取 250MB 的 CSV 文件。
这是cat /proc/<my pid>/status
Name: (name)
Umask: 0002
State: S (sleeping)
Tgid: 18349
Ngid: 0
Pid: 18349
PPid: 18311
TracerPid: 0
Uid: 1000 1000 1000 1000
Gid: 1000 1000 1000 1000
FDSize: 256
Groups: 4 24 27 30 46 118 128 133 1000
NStgid: 18349
NSpid: 18349
NSpgid: 18349
NSsid: 18311
VmPeak: 2748152 kB
VmSize: 2354932 kB
VmLck: 0 kB
VmPin: 0 kB
VmHWM: 2580156 kB
VmRSS: 2345944 kB
RssAnon: 2343900 kB
RssFile: 2044 kB
RssShmem: 0 kB
VmData: 2343884 kB
VmStk: 136 kB
VmExe: 304 kB
VmLib: 2332 kB
VmPTE: 4648 kB
VmSwap: 0 kB
HugetlbPages: 0 kB
CoreDumping: 0
THP_enabled: 1
Threads: 1
SigQ: 0/127783
SigPnd: 0000000000000000
ShdPnd: 0000000000000000
SigBlk: 0000000000000000
SigIgn: 0000000000001000
SigCgt: 0000000180000440
CapInh: 0000000000000000
CapPrm: 0000000000000000
CapEff: 0000000000000000
CapBnd: 0000003fffffffff
CapAmb: 0000000000000000
NoNewPrivs: 0
Seccomp: 0
Speculation_Store_Bypass: thread vulnerable
Cpus_allowed: ffffffff
Cpus_allowed_list: 0-31
Mems_allowed: 00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000001
Mems_allowed_list: 0
voluntary_ctxt_switches: 9
nonvoluntary_ctxt_switches: 293
当我删除变量时,它释放了正确的空间(大约 250MB),但还剩下 2.2GB。在我的所有内存被用完并且进程被终止之前,我无法读取超过 2-3GB 的数据(cargo
打印“Killed”)。
如何释放多余的内存而正在读取 CSV?
我需要处理每一行,但在这种情况下我不需要一次保存所有这些数据,但如果我保存了呢?
问了一个related question and I was pointed to
我的理解是我应该将我的 crate 切换到不同的内存分配器,但是强制使用我能找到的所有分配器感觉像是一种无知的方法。
对于有关内存的问题,最好开发一种量化内存使用的技术。您可以通过检查您的 表示 来做到这一点。在本例中,即 Vec<Vec<String>>
。特别是,如果您有一个 250MB 的 CSV 文件,它表示为一系列字段,那么您不一定只使用 250MB 的内存。您需要考虑您的代表的 开销。
对于 Vec<Vec<String>>
,我们可以消除外部 Vec<...>
的开销,因为它将(在您的程序中)在堆栈上而不是堆上。它在堆上的内部 Vec<String>
上。
因此,如果您的 CSV 文件有 M
条记录并且每条记录有 N
字段,那么将有 M
个 Vec<String>
和 M * N
的实例String
的实例。 Vec<T>
和 String
的开销都是 3 * sizeof(word)
,一个字是指向数据的指针,另一个字是长度,另一个字是容量。 (对于 64 位目标,这是 24 个字节。)因此,对于 64 位目标,您的总开销是 (M * 24) + (M * N * 24)
.
让我们通过实验来测试一下。因为你没有分享你的 CSV 输入(你真的应该在未来),我会带来我自己的。它有 145MB,有 M=3,173,958
条记录,每条记录有 N=7
个字段。因此,表示的总开销为 (3173958 * 24) + (3173958 * 7 * 24) = 609,399,936
字节,即 609 MB。让我们用一个真实的程序来测试一下:
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_path = match std::env::args_os().nth(1) {
Some(p) => p,
None => {
eprintln!("Usage: csvmem <path>");
std::process::exit(1);
}
};
let rdr = csv::Reader::from_path(input_path)?;
let mut records: Vec<Vec<String>> = vec![];
for result in rdr.into_records() {
let mut record: Vec<String> = vec![];
for column in result?.iter() {
record.push(column.to_string());
}
records.push(record);
}
println!("{}", records.len());
Ok(())
}
(我在几个地方添加了一些不必要的类型注释以使代码更清晰一些,特别是关于我们的表示。)所以让我们 运行 这个程序(它唯一的依赖是 csv = "1"
在我的 Cargo.toml
):
$ echo $TIMEFMT
real %*E user %*U sys %*S maxmem %M MB faults %F
$ cargo b --release
$ time ./target/release/csvmem /m/sets/csv/pop/worldcitiespop-nice.csv
3173958
real 1.542
user 1.236
sys 0.296
maxmem 1287 MB
faults 0
此处的 time
实用程序报告 peak 内存使用情况,实际上比我们预期的要高一点:609 + 145 = 754MB
。我不太了解分配器,无法完全理解差异。可能是我正在使用的系统分配器分配了比实际需要的更大的块。让我们通过使用 Box<str>
而不是 String
来使我们的表示更有效。我们牺牲了扩展字符串的能力,但作为交换,我们为每个字段节省了 8 个字节的开销。因此,我们的新开销计算结果为 (3173958 * 24) + (3173958 * 7 * 16) = 431,658,288
字节或 431MB,相差 609 - 431 = 178MB
。那么让我们测试一下我们的新表示,看看我们的增量是多少:
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_path = match std::env::args_os().nth(1) {
Some(p) => p,
None => {
eprintln!("Usage: csvmem <path>");
std::process::exit(1);
}
};
let rdr = csv::Reader::from_path(input_path)?;
let mut records: Vec<Vec<Box<str>>> = vec![];
for result in rdr.into_records() {
let mut record: Vec<Box<str>> = vec![];
for column in result?.iter() {
record.push(column.to_string().into());
}
records.push(record);
}
println!("{}", records.len());
Ok(())
}
并编译 运行:
$ cargo b --release
$ time ./target/release/csvmem /m/sets/csv/pop/worldcitiespop-nice.csv
3173958
real 1.459
user 1.183
sys 0.266
maxmem 1093 MB
faults 0
总增量为 194MB。这与我们的猜测非常接近。
我们可以使用 Vec<Box<[Box<str>]>>
:
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_path = match std::env::args_os().nth(1) {
Some(p) => p,
None => {
eprintln!("Usage: csvmem <path>");
std::process::exit(1);
}
};
let rdr = csv::Reader::from_path(input_path)?;
let mut records: Vec<Box<[Box<str>]>> = vec![];
for result in rdr.into_records() {
let mut record: Vec<Box<str>> = vec![];
for column in result?.iter() {
record.push(column.to_string().into());
}
records.push(record.into());
}
println!("{}", records.len());
Ok(())
}
这给出了 1069 MB 的峰值内存使用量。所以节省的钱不多。
但是,我们能做的最好的事情就是使用 csv::StringRecord
:
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_path = match std::env::args_os().nth(1) {
Some(p) => p,
None => {
eprintln!("Usage: csvmem <path>");
std::process::exit(1);
}
};
let rdr = csv::Reader::from_path(input_path)?;
let mut records = vec![];
for result in rdr.into_records() {
let record = result?;
records.push(record);
}
println!("{}", records.len());
Ok(())
}
这给出了 727MB 的峰值内存使用量。秘诀在于 StringRecord
以内联方式存储字段,而无需第二层间接寻址。最终节省了不少!
当然,如果您不需要一次将所有记录存储在内存中,那么您不应该这样做。 CSV crate 支持很好:
fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_path = match std::env::args_os().nth(1) {
Some(p) => p,
None => {
eprintln!("Usage: csvmem <path>");
std::process::exit(1);
}
};
let mut count = 0;
let rdr = csv::Reader::from_path(input_path)?;
for result in rdr.into_records() {
let _ = result?;
count += 1;
}
println!("{}", count);
Ok(())
}
而且该程序的内存使用峰值仅为 9MB,正如您对流式实现所期望的那样。 (从技术上讲,如果您下拉并使用 csv-core
板条箱,则完全可以不使用堆内存。)