使用块对大文件进行排序
Sorting a big file using its chunks
假设我们要对 column=X
周围有 40000 行的文件进行排序。我们还假设相同的值在 table 中广泛存在,因此在 column=X
中具有相同值的行不仅会在前 1000 行中找到。现在,如果我们按块读取文件并只考虑 1000 行,如果我们要再次对该列周围的 table 进行排序,我们可能会将 column=X
中具有相同值的其他行弄乱。那么我们如何解决这个问题呢?由于没有可用数据,因此不需要代码,但我正在寻找您对此事的意见?我们是否应该通过 merge sort 将每个块并行地交给合并排序算法,然后重新组合结果?我没有看到 pandas 可以做到这一点,但我不确定?
import pandas as pd
chunk_size = 1000
batch_no = 1
for chunk in pd.read_csv('data.csv', chunksize=chunk_size):
chunk.sort_values(by='X', inplace=True)
chunk.to_csv('data' +str(batch_no) + '.csv', index=False)
batch_no +=1
您需要 merge 排序的 csv 文件,幸运的是 Python 提供了一个功能。如下使用:
from operator import itemgetter
import pandas as pd
import numpy as np
import csv
import heapq
# generate test data
test_data = pd.DataFrame(data=[[f"label{i}", val] for i, val in enumerate(np.random.uniform(size=40000))],
columns=["label", "X"])
test_data.to_csv("data.csv", index=False)
# read and sort each chunk
chunk_size = 1000
file_names = []
for batch_no, chunk in enumerate(pd.read_csv("data.csv", chunksize=chunk_size), 1):
chunk.sort_values(by="X", inplace=True)
file_name = f"data_{batch_no}.csv"
chunk.to_csv(file_name, index=False)
file_names.append(file_name)
# merge the chunks
chunks = [csv.DictReader(open(file_name)) for file_name in file_names]
with open("data_sorted.csv", "w") as outfile:
field_names = ["label", "X"]
writer = csv.DictWriter(outfile, fieldnames=field_names)
writer.writeheader()
for row in heapq.merge(*chunks, key=itemgetter("X")):
writer.writerow(row)
来自 heapq.merge 上的文档:
Merge multiple sorted inputs into a single sorted output (for example,
merge timestamped entries from multiple log files). Returns an
iterator over the sorted values.
Similar to sorted(itertools.chain(*iterables)) but returns an
iterable, does not pull the data into memory all at once, and assumes
that each of the input streams is already sorted (smallest to
largest).
因此,使用 heapq.merge 不会将所有数据加载到内存中。还值得注意的是,此函数的复杂度为 O(n)
,其中 n 是整个数据的大小。因此整体排序算法为O(nlogn)
假设我们要对 column=X
周围有 40000 行的文件进行排序。我们还假设相同的值在 table 中广泛存在,因此在 column=X
中具有相同值的行不仅会在前 1000 行中找到。现在,如果我们按块读取文件并只考虑 1000 行,如果我们要再次对该列周围的 table 进行排序,我们可能会将 column=X
中具有相同值的其他行弄乱。那么我们如何解决这个问题呢?由于没有可用数据,因此不需要代码,但我正在寻找您对此事的意见?我们是否应该通过 merge sort 将每个块并行地交给合并排序算法,然后重新组合结果?我没有看到 pandas 可以做到这一点,但我不确定?
import pandas as pd
chunk_size = 1000
batch_no = 1
for chunk in pd.read_csv('data.csv', chunksize=chunk_size):
chunk.sort_values(by='X', inplace=True)
chunk.to_csv('data' +str(batch_no) + '.csv', index=False)
batch_no +=1
您需要 merge 排序的 csv 文件,幸运的是 Python 提供了一个功能。如下使用:
from operator import itemgetter
import pandas as pd
import numpy as np
import csv
import heapq
# generate test data
test_data = pd.DataFrame(data=[[f"label{i}", val] for i, val in enumerate(np.random.uniform(size=40000))],
columns=["label", "X"])
test_data.to_csv("data.csv", index=False)
# read and sort each chunk
chunk_size = 1000
file_names = []
for batch_no, chunk in enumerate(pd.read_csv("data.csv", chunksize=chunk_size), 1):
chunk.sort_values(by="X", inplace=True)
file_name = f"data_{batch_no}.csv"
chunk.to_csv(file_name, index=False)
file_names.append(file_name)
# merge the chunks
chunks = [csv.DictReader(open(file_name)) for file_name in file_names]
with open("data_sorted.csv", "w") as outfile:
field_names = ["label", "X"]
writer = csv.DictWriter(outfile, fieldnames=field_names)
writer.writeheader()
for row in heapq.merge(*chunks, key=itemgetter("X")):
writer.writerow(row)
来自 heapq.merge 上的文档:
Merge multiple sorted inputs into a single sorted output (for example, merge timestamped entries from multiple log files). Returns an iterator over the sorted values.
Similar to sorted(itertools.chain(*iterables)) but returns an iterable, does not pull the data into memory all at once, and assumes that each of the input streams is already sorted (smallest to largest).
因此,使用 heapq.merge 不会将所有数据加载到内存中。还值得注意的是,此函数的复杂度为 O(n)
,其中 n 是整个数据的大小。因此整体排序算法为O(nlogn)