使用有限的 RAM 以类似 sql 的方式连接大文件

Concatenate large files in sql-like way with limited RAM

我有一个很大的 A.csv 文件 (~5 Gb),其中有几列。其中一列是 Model。 还有另一个大型 B.csv 文件 (~15 Gb),其中包含 VendorNameModel 列。

两个问题:

1) 如何创建结果文件,将 A.csv 中的所有列与 B.csv 中相应的 VendorName 相结合(加入 Model ).诀窍是 - 当我的 RAM 只有 4 Gb 并且我正在使用 python.

时如何做到这一点

2) 我如何创建一个样本(比如 1 Gb)结果文件,该文件结合来自 A.csv(所有列)的随机子样本与 VendorName 来自 B.csv。诀窍还是在 4 Gb RAM 中。

我知道如何在 pandas 中做到这一点,但 4 Gb 是我无法克服的限制因素 (

正如@Marc B 所说,一次读取一行是解决方案。 关于加入我会做以下(伪代码:我不知道python)。

  1. "Select distinct Model from A" 第一个文件 A.csv

读取所有行,搜索模型字段并在 list/array/map

中收集不同的值
  1. "Select distinct Model from B" 在第二个文件上 B.csv

与 1 相同的操作,但使用另一个 list/array/map

  1. 找到匹配的模型

比较两个 lists/arrays/maps 只找到匹配的模型(它们将成为连接的一部分)

  1. 加入

读取文件A中匹配模型的行,读取文件B中匹配相同模型的所有行,并将连接结果写入文件C。对于所有型号。

注意:没有特别优化。

对于第 2 点,只需选择一个匹配模型的子集 and/or 读取文件 A and/or B 的部分行与加工模型。

这是一个想法:

第 1 步:按型号对两个文件进行排序。 Mergesort 对此很有用。将每个文件拆分为小到足以在 RAM 中排序的小文件,然后在对每个文件排序后,将它们合并为一个大的排序文件。有关合并多个已排序文件的好方法,请参阅 更新: 请参阅我对 example/code 的回答的结尾。

第二步:将两个文件按Model合并。再次类似Mergesort的合并步骤,遍历两个已排序的文件"in parallel",适当推进每个文件,并通过匹配模型值加入。

步骤 2 的伪代码:

open the two sorted files A and B
blockA = read block of same-model rows from A
blockB = read block of same-model rows from B
while True:
    while model of blockA differs from model of blockB:
        if model of blockA is smaller:
            blockA = read block of same-model rows from A
            quit if there isn't any (i.e. end of file reached)
        else:
            blockB = read block of same-model rows from B
            quit if there isn't any (i.e. end of file reached)
    output the cross product of blockA and blockB

另一个想法:

如果Model比较少,那么按Model分行成文件可能会更好。例如,将行存储在文件 A_Model1.csvA_Model2.csv 等和 B_Model1.csvB_Model2.csv 等中。然后取 A_Model1.csv 和 [=17 的叉积=],A_Model2.csvB_Model2.csv,等等


对于问题 2, 我只是计算行数,使用 random.sample 来选择行号,然后获取这些行。

>>> import random
>>> number_of_rows = 100
>>> number_of_sample_rows = 10
>>> sorted(random.sample(range(number_of_rows), number_of_sample_rows))
[6, 18, 23, 32, 41, 44, 58, 59, 91, 96]

(then go through the file and fetch those rows)

更新: 这是上面第 2 步的 code/demo。我制作了三个文件 B1.csv、B2.csv 和 B3.csv:

Vendor,Name,Model
vfoo,nhi,m1
vbar,nho,m4
vbaz,nhe,m7

Vendor,Name,Model
vZ,nX,m2
vY,nZ,m6
vX,nY,m8

Vendor,Name,Model
v,n3,m3
v,na,m5
v,n_,m9

合并结果文件如下Bmerged.csv:

Vendor,Name,Model
vfoo,nhi,m1
vZ,nX,m2
v,n3,m3
vbar,nho,m4
v,na,m5
vY,nZ,m6
vbaz,nhe,m7
vX,nY,m8
v,n_,m9

这是代码:

import csv, heapq

filenames = ('B1.csv', 'B2.csv', 'B3.csv')

# Prepare the input streams
files = list(map(open, filenames))
readers = [iter(csv.reader(file)) for file in files]
headers = list(map(next, readers))
def model_and_row(row):
    return row[2], row
model_and_row_streams = [map(model_and_row, reader) for reader in readers]

# Merge them into the output file
with open('Bmerged.csv', 'w', newline='') as outfile:
    writer = csv.writer(outfile)
    writer.writerow(headers[0])
    for _, row in heapq.merge(*model_and_row_streams):
        writer.writerow(row)

# Close the input files
for file in files:
    file.close()

请注意,我使用的是 Python 3。在 Python 2 中,您需要使用 itertools.imap(model_and_row, reader) 以免一次将整个文件读入内存。

在Python中逐行读取文件。这是一个非常快速和简单的方法:example

output = open("outputfile.csv", "a")
lines = []
for line in open("file.csv", "r"):
    lines.append(line)
    if len(lines) == 1000000:
        output.writelines(lines)
        del lines[:]
if bool(lines):
    output.writelines(lines)

根据可用RAM调整if语句中数组的长度