什么是读取 CSV 文件的内存有效方式?

What is a memory efficient way to read a CSV file?

我的程序使用 csv 板条箱将 CSV 文件读取到 Vec<Vec<String>>,其中外部向量代表行,内部向量将行分成列。

use std::{time, thread::{sleep, park}};
use csv;

fn main() {
    different_scope();

    println!("Parked");
    park();
}

fn different_scope() {
    println!("Reading csv");
    let _data = read_csv("data.csv");

    println!("Sleeping");
    sleep(time::Duration::from_secs(4));

    println!("Going out of scope");
}

fn read_csv(path: &str) -> Vec<Vec<String>> {
    let mut rdr = csv::Reader::from_path(path).unwrap();

    return rdr
        .records()
        .map(|row| {
            row
                .unwrap()
                .iter()
                .map(|column| column.to_string())
                .collect()
        })
        .collect();
}

我正在查看 htop 的 RAM 使用情况,它使用 2.5GB 的内存来读取 250MB 的 CSV 文件。

这是cat /proc/<my pid>/status

的内容
Name:   (name)
Umask:  0002
State:  S (sleeping)
Tgid:   18349
Ngid:   0
Pid:    18349
PPid:   18311
TracerPid:  0
Uid:    1000    1000    1000    1000
Gid:    1000    1000    1000    1000
FDSize: 256
Groups: 4 24 27 30 46 118 128 133 1000 
NStgid: 18349
NSpid:  18349
NSpgid: 18349
NSsid:  18311
VmPeak:  2748152 kB
VmSize:  2354932 kB
VmLck:         0 kB
VmPin:         0 kB
VmHWM:   2580156 kB
VmRSS:   2345944 kB
RssAnon:     2343900 kB
RssFile:        2044 kB
RssShmem:          0 kB
VmData:  2343884 kB
VmStk:       136 kB
VmExe:       304 kB
VmLib:      2332 kB
VmPTE:      4648 kB
VmSwap:        0 kB
HugetlbPages:          0 kB
CoreDumping:    0
THP_enabled:    1
Threads:    1
SigQ:   0/127783
SigPnd: 0000000000000000
ShdPnd: 0000000000000000
SigBlk: 0000000000000000
SigIgn: 0000000000001000
SigCgt: 0000000180000440
CapInh: 0000000000000000
CapPrm: 0000000000000000
CapEff: 0000000000000000
CapBnd: 0000003fffffffff
CapAmb: 0000000000000000
NoNewPrivs: 0
Seccomp:    0
Speculation_Store_Bypass:   thread vulnerable
Cpus_allowed:   ffffffff
Cpus_allowed_list:  0-31
Mems_allowed:   00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000001
Mems_allowed_list:  0
voluntary_ctxt_switches:    9
nonvoluntary_ctxt_switches: 293

当我删除变量时,它释放了正确的空间(大约 250MB),但还剩下 2.2GB。在我的所有内存被用完并且进程被终止之前,我无法读取超过 2-3GB 的数据(cargo 打印“Killed”)。

如何释放多余的内存正在读取 CSV?

我需要处理每一行,但在这种情况下我不需要一次保存所有这些数据,但如果我保存了呢?

问了一个related question and I was pointed to ,对理解问题很有帮助,但是不知道怎么解决

我的理解是我应该将我的 crate 切换到不同的内存分配器,但是强制使用我能找到的所有分配器感觉像是一种无知的方法。

对于有关内存的问题,最好开发一种量化内存使用的技术。您可以通过检查您的 表示 来做到这一点。在本例中,即 Vec<Vec<String>>。特别是,如果您有一个 250MB 的 CSV 文件,它表示为一系列字段,那么您不一定只使用 250MB 的内存。您需要考虑您的代表的 开销

对于 Vec<Vec<String>>,我们可以消除外部 Vec<...> 的开销,因为它将(在您的程序中)在堆栈上而不是堆上。它在堆上的内部 Vec<String> 上。

因此,如果您的 CSV 文件有 M 条记录并且每条记录有 N 字段,那么将有 MVec<String>M * N 的实例String 的实例。 Vec<T>String 的开销都是 3 * sizeof(word),一个字是指向数据的指针,另一个字是长度,另一个字是容量。 (对于 64 位目标,这是 24 个字节。)因此,对于 64 位目标,您的总开销是 (M * 24) + (M * N * 24).

让我们通过实验来测试一下。因为你没有分享你的 CSV 输入(你真的应该在未来),我会带来我自己的。它有 145MB,有 M=3,173,958 条记录,每条记录有 N=7 个字段。因此,表示的总开销为 (3173958 * 24) + (3173958 * 7 * 24) = 609,399,936 字节,即 609 MB。让我们用一个真实的程序来测试一下:

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let input_path = match std::env::args_os().nth(1) {
        Some(p) => p,
        None => {
            eprintln!("Usage: csvmem <path>");
            std::process::exit(1);
        }
    };
    let rdr = csv::Reader::from_path(input_path)?;
    let mut records: Vec<Vec<String>> = vec![];
    for result in rdr.into_records() {
        let mut record: Vec<String> = vec![];
        for column in result?.iter() {
            record.push(column.to_string());
        }
        records.push(record);
    }
    println!("{}", records.len());
    Ok(())
}

(我在几个地方添加了一些不必要的类型注释以使代码更清晰一些,特别是关于我们的表示。)所以让我们 运行 这个程序(它唯一的依赖是 csv = "1" 在我的 Cargo.toml):

$ echo $TIMEFMT
real %*E user %*U sys %*S maxmem %M MB faults %F
$ cargo b --release
$ time ./target/release/csvmem /m/sets/csv/pop/worldcitiespop-nice.csv
3173958

real    1.542
user    1.236
sys     0.296
maxmem  1287 MB
faults  0

此处的 time 实用程序报告 peak 内存使用情况,实际上比我们预期的要高一点:609 + 145 = 754MB。我不太了解分配器,无法完全理解差异。可能是我正在使用的系统分配器分配了比实际需要的更大的块。让我们通过使用 Box<str> 而不是 String 来使我们的表示更有效。我们牺牲了扩展字符串的能力,但作为交换,我们为每个字段节省了 8 个字节的开销。因此,我们的新开销计算结果为 (3173958 * 24) + (3173958 * 7 * 16) = 431,658,288 字节或 431MB,相差 609 - 431 = 178MB。那么让我们测试一下我们的新表示,看看我们的增量是多少:

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let input_path = match std::env::args_os().nth(1) {
        Some(p) => p,
        None => {
            eprintln!("Usage: csvmem <path>");
            std::process::exit(1);
        }
    };
    let rdr = csv::Reader::from_path(input_path)?;
    let mut records: Vec<Vec<Box<str>>> = vec![];
    for result in rdr.into_records() {
        let mut record: Vec<Box<str>> = vec![];
        for column in result?.iter() {
            record.push(column.to_string().into());
        }
        records.push(record);
    }
    println!("{}", records.len());
    Ok(())
}

并编译 运行:

$ cargo b --release
$ time ./target/release/csvmem /m/sets/csv/pop/worldcitiespop-nice.csv
3173958

real    1.459
user    1.183
sys     0.266
maxmem  1093 MB
faults  0

总增量为 194MB。这与我们的猜测非常接近。

我们可以使用 Vec<Box<[Box<str>]>>:

进一步优化表示
fn main() -> Result<(), Box<dyn std::error::Error>> {
    let input_path = match std::env::args_os().nth(1) {
        Some(p) => p,
        None => {
            eprintln!("Usage: csvmem <path>");
            std::process::exit(1);
        }
    };
    let rdr = csv::Reader::from_path(input_path)?;
    let mut records: Vec<Box<[Box<str>]>> = vec![];
    for result in rdr.into_records() {
        let mut record: Vec<Box<str>> = vec![];
        for column in result?.iter() {
            record.push(column.to_string().into());
        }
        records.push(record.into());
    }
    println!("{}", records.len());
    Ok(())
}

这给出了 1069 MB 的峰值内存使用量。所以节省的钱不多。

但是,我们能做的最好的事情就是使用 csv::StringRecord:

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let input_path = match std::env::args_os().nth(1) {
        Some(p) => p,
        None => {
            eprintln!("Usage: csvmem <path>");
            std::process::exit(1);
        }
    };
    let rdr = csv::Reader::from_path(input_path)?;
    let mut records = vec![];
    for result in rdr.into_records() {
        let record = result?;
        records.push(record);
    }
    println!("{}", records.len());
    Ok(())
}

这给出了 727MB 的峰值内存使用量。秘诀在于 StringRecord 以内联方式存储字段,而无需第二层间接寻址。最终节省了不少!

当然,如果您不需要一次将所有记录存储在内存中,那么您不应该这样做。 CSV crate 支持很好:

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let input_path = match std::env::args_os().nth(1) {
        Some(p) => p,
        None => {
            eprintln!("Usage: csvmem <path>");
            std::process::exit(1);
        }
    };
    let mut count = 0;
    let rdr = csv::Reader::from_path(input_path)?;
    for result in rdr.into_records() {
        let _ = result?;
        count += 1;
    }
    println!("{}", count);
    Ok(())
}

而且该程序的内存使用峰值仅为 9MB,正如您对流式实现所期望的那样。 (从技术上讲,如果您下拉并使用 csv-core 板条箱,则完全可以不使用堆内存。)