如何按出现次数删除两个大文本文件中的重复行?
How to remove duplicate lines in two large text files by number of appearance?
我有两个行数相同的大文本文件(900 万行,每个文件大约 12 GB)。所以它们无法加载到内存中。
table 中显示的这些文本文件中的行如下所示:
我需要删除 A.txt 和 [=39 中的重复项=] 并且只为来自 A.txt 的每一行保留最频繁的组合。如果有 2 个最频繁的行且重复次数相同,程序应选择最先出现在文本中的行并删除所有其他行。
在实际文件中,行不仅仅是 (A,B,C,D,...1,6,7,..),每行大约有 2000 个字符。
table 中显示的最终文本文件应如下所示:
此选项不需要内存中的整个文件,但需要保留一个以 A 为键的字典和多个以 B 为键的字典。如果您可以对值进行散列或分类(为每个唯一的 A 和每个唯一的 B 分配一个 int 值),这可以简化。
编辑:更改字典以使用散列键减少内存占用,但代价是 CPU 并更改输出以显示要保留的行(因为原始 A 和 B 值被混淆)
假设我的文件是:
Lines,A.txt,B.txt
1,A,1
2,A,1
3,A,2
4,B,1
5,B,2
from collections import Counter
from csv import DictReader
_ = {}
_file = DictReader(open('abc.txt', 'r'), delimiter=',')
hash_to_line = {}
for row in _file:
a = hash(row['A.txt'])
b = hash(row['B.txt'])
if a not in _:
_[a] = Counter()
if b not in _[a]:
hash_to_line[(a, b)] = row['Lines']
_[a][b] += 1
output = []
for A in _:
_vals = list(_[A].values())
_keys = list(_[A].keys())
_max = max(_vals)
_vals.index(_max)
A, _[A][_keys[_vals.index(_max)]]
output.append(hash_to_line[A, _keys[_vals.index(_max)]])
print('lines to keep:', output)
用适当的结果存储替换打印。
如何避免一次将 2 × 12 GB 读入内存,但仍处理所有数据?
通过逐块加载这 24 GB 并丢弃您不再需要的数据。由于您的文件是基于行的,因此逐行阅读和处理似乎是明智的。一次在内存中存储 4000 个字符应该不会对现代个人计算机造成问题。
合并文件
您希望最终结果按 A.txt
的行内容排序(甚至排序)。为了不丢失A.txt
和B.txt
行之间的关系,我们需要先合并它们的内容。
- 打开两个文件但尚未阅读它们
- 正在打开一个新文件
AB.txt
进行写入
- 重复以下操作,直到处理完所有
A.txt
和 B.txt
:
- 读一行
A.txt
- 读一行
B.txt
- 将组合内容作为新行附加到
AB.txt
- 丢弃我们目前从记忆中读到的内容
如果您知道某个字符(比如 '\t'
)不能出现在 A.txt
中,您可以使用它作为分隔符:
with \
open("A.txt") as a_file, \
open("B.txt") as b_file, \
open("AB.txt", "w") as ab_file:
for a_line, b_line in zip(a_file, b_file):
# get rid of the line endings, whatever they are
a_line, = a_line.splitlines()
b_line, = b_line.splitlines()
# output the combined content to AB.txt
print(f"{a_line}\t{b_line}", file=ab_file)
(请注意,这依赖于 zip
执行 "lazy" 并返回一个生成器,而不是像 Python 2 中那样完全读取文件并返回一个巨大的列表。)
如果 A.txt
中的所有行都具有相同的固定长度,则根本不需要任何分隔符。 (不过,为了在调试时保持理智,您可能仍想使用一个。)如果您不知道 A.txt
中不能出现的任何字符,您可以创建一个 csv.writer
and use its writerow
method 来将行写入 AB.txt
。它将为您处理所需的转义或引用。
您可能想知道步骤
discard what we've read so far from memory
已实施。这在这里隐式发生,因为保存文件数据的唯一变量 a_line
和 b_line
会在每次迭代时被覆盖。
正在订购组合文件
为了订购整个文件,我们必须将其完全加载到内存中,对吗?
没有。好吧,实际上是的,但同样,不是一次全部。我们可以使用 external sorting. You can try to implement this yourself in Python, or you can just use the UNIX command line tool sort
(manual page),它就是这样做的。在 Linux 或 macOS 系统上,您已经可以使用它。在 Windows 上,它应该包含在任何 UNIX 模拟器中,例如 Cygwin 或 MinGW。如果您已经使用 Windows 的默认 Git 安装程序安装了 Git,您可以使用随附的 "Git Bash".
中的 UNIX sort
请注意,由于我们的内容在每一行中的顺序,文件将首先按来自 A.txt
的内容排序,然后(如果相同)按来自 B.txt
.
计数中
一旦你有了排序后的组合文件,你可以再次逐行处理它,但你必须在行之间保留 一些 数据。
我们要做的是:
- 对于具有相同 A 内容的后续行的每个块:
- 其中,对于具有相同 B 内容的后续行的每个块:
- 计算它的行数
- 统计哪些 B 内容(在 A 内容块内)的行数最多
- 在 A-content 块的末尾:输出包含 A-content 和该 A-content 最常见的 B-content 的一行
因为我们可以依赖上面强加的顺序,所以这将产生想要的结果。
像这样的东西应该可以工作:
- 读一行
- 将其拆分为 A 内容和 B 内容
- 如果A内容与上一行相同*:
- 如果B内容与上一行相同*:
- 增加当前 a-b 内容组合的计数器
- else(即,如果 B 内容与上一行不同):
- 存储到目前为止看到最多的 B 内容及其总数
- (要么是之前最常看到的B内容,要么是上一行的内容)
- 重置当前 a-b 内容组合的计数器
- 将计数器加一(对于当前行)
- 将 B 内容存储在某处,以便我们可以将其与下一次迭代中的下一行的内容进行比较
- else(即,如果 A 内容与上一行不同)**:
- 输出上一行的A内容和上一行出现次数最多的B内容
- 重置当前 a-b 内容组合的计数器
- 重置关于什么是最受关注的 B 内容及其计数的信息
- 存储当前行的A-content和B-content,以便在下一次迭代中将它们与下一行的内容进行比较
- 重复直到处理完整个文件
* 对于第一行,这是隐含的 false
** 当您到达文件末尾时也执行此操作
实际执行此操作是在 Python 作为 reader 的练习者。请注意,您必须在上面的描述中提到的步骤之前定义一些使用的变量,以便它们具有正确的范围。
请注意,您还可以使用 Python 综合标准库的功能比此处描述的更巧妙地执行计数步骤。请参阅 了解一个很好的例子。
基于, counting the combined and sorted lines with nested groupby
:
from itertools import groupby
from operator import itemgetter
# Combine and sort, in reality done like das-g described.
A = 'ABCDACADAEF'
B = '16781918216'
combined_and_sorted = sorted(zip(A, B))
# Count and produce the results
for a, agroup in groupby(combined_and_sorted, itemgetter(0)):
bcounts = ((b, sum(1 for _ in group))
for b, group in groupby(agroup, itemgetter(1)))
print(a, max(bcounts, key=itemgetter(1))[0])
A 1
B 6
C 7
D 8
E 1
F 6
正如我在评论中提到的,我认为使用 shelve
模块可以让你做你想做的事。
这是一个示例实现及其输出。 注意 我在 A.txt
和 B.txt
的末尾添加了四行,以确保在不是第一个遇到的值时选择了正确的值。 另请注意 我在开发和调试代码中留下了您肯定希望在 运行 非常大的输入文件上删除或禁用它。
作为 table,两个输入文件如下所示:
1 A 1
2 B 6
3 C 7
4 D 8
5 A 1
6 C 9
7 A 1
8 D 8
9 A 2
10 E 1
11 F 6
12 G 5
13 G 7
15 G 7
16 G 7
并且,再次作为 table,如果重写这两个输出文件,它们将是这样的:
1 A 1
2 B 6
3 C 7
4 D 8
5 E 1
6 F 6
7 G 7
import glob
from operator import itemgetter, methodcaller
import os
import shelve
shelf_name = 'temp_shelf'
file_a_name = 'A.txt'
file_b_name = 'B.txt'
with open(file_a_name) as file_a, \
open(file_b_name) as file_b, \
shelve.open(shelf_name, flag='n', writeback=False) as shelf:
for line, items in enumerate(zip(file_a, file_b)):
x, fx = map(methodcaller('rstrip'), items) # Remove line endings.
d = shelf.setdefault(x, {}) # Each shelf entry is a regular dict.
d[fx] = d.setdefault(fx, 0) + 1 # Update its value.
shelf[x] = d # Update shelf.
print('{!r} -> {!r}'.format(x, shelf[x]))
# Show what ended up in shelf during development.
print('\nFinal contents of shelf:')
for k, d in shelf.items():
print(' {!r}: {!r}'.format(k, d))
# Change file names to prevent overwriting originals during development.
file_a_name = 'A_updated.txt'
file_b_name = 'B_updated.txt'
comp_name = 'composite.txt' # Used for development only.
# Update files leaving only most frequent combination.
with open(file_a_name, 'w') as file_a, \
open(file_b_name, 'w') as file_b, \
open(comp_name, 'w') as file_c, \
shelve.open(shelf_name, flag='r') as shelf:
for line, (k, d) in enumerate(shelf.items(), 1):
file_a.write(k + '\n')
fx = sorted(d.items(), # Rev sort by number of occurrences.
key=itemgetter(1), reverse=True)[0][0] # Most common fx.
file_b.write(fx + '\n')
file_c.write('{} {} {}\n'.format(line, k, fx)) # For development only.
# CLean up by removing shelf files.
for filename in glob.glob(shelf_name + '.*'):
print('removing', filename)
os.remove(filename)
print('\nDone')
我有两个行数相同的大文本文件(900 万行,每个文件大约 12 GB)。所以它们无法加载到内存中。
table 中显示的这些文本文件中的行如下所示:
我需要删除 A.txt 和 [=39 中的重复项=] 并且只为来自 A.txt 的每一行保留最频繁的组合。如果有 2 个最频繁的行且重复次数相同,程序应选择最先出现在文本中的行并删除所有其他行。
在实际文件中,行不仅仅是 (A,B,C,D,...1,6,7,..),每行大约有 2000 个字符。
table 中显示的最终文本文件应如下所示:
此选项不需要内存中的整个文件,但需要保留一个以 A 为键的字典和多个以 B 为键的字典。如果您可以对值进行散列或分类(为每个唯一的 A 和每个唯一的 B 分配一个 int 值),这可以简化。
编辑:更改字典以使用散列键减少内存占用,但代价是 CPU 并更改输出以显示要保留的行(因为原始 A 和 B 值被混淆)
假设我的文件是:
Lines,A.txt,B.txt
1,A,1
2,A,1
3,A,2
4,B,1
5,B,2
from collections import Counter
from csv import DictReader
_ = {}
_file = DictReader(open('abc.txt', 'r'), delimiter=',')
hash_to_line = {}
for row in _file:
a = hash(row['A.txt'])
b = hash(row['B.txt'])
if a not in _:
_[a] = Counter()
if b not in _[a]:
hash_to_line[(a, b)] = row['Lines']
_[a][b] += 1
output = []
for A in _:
_vals = list(_[A].values())
_keys = list(_[A].keys())
_max = max(_vals)
_vals.index(_max)
A, _[A][_keys[_vals.index(_max)]]
output.append(hash_to_line[A, _keys[_vals.index(_max)]])
print('lines to keep:', output)
用适当的结果存储替换打印。
如何避免一次将 2 × 12 GB 读入内存,但仍处理所有数据?
通过逐块加载这 24 GB 并丢弃您不再需要的数据。由于您的文件是基于行的,因此逐行阅读和处理似乎是明智的。一次在内存中存储 4000 个字符应该不会对现代个人计算机造成问题。
合并文件
您希望最终结果按 A.txt
的行内容排序(甚至排序)。为了不丢失A.txt
和B.txt
行之间的关系,我们需要先合并它们的内容。
- 打开两个文件但尚未阅读它们
- 正在打开一个新文件
AB.txt
进行写入 - 重复以下操作,直到处理完所有
A.txt
和B.txt
:- 读一行
A.txt
- 读一行
B.txt
- 将组合内容作为新行附加到
AB.txt
- 丢弃我们目前从记忆中读到的内容
- 读一行
如果您知道某个字符(比如 '\t'
)不能出现在 A.txt
中,您可以使用它作为分隔符:
with \
open("A.txt") as a_file, \
open("B.txt") as b_file, \
open("AB.txt", "w") as ab_file:
for a_line, b_line in zip(a_file, b_file):
# get rid of the line endings, whatever they are
a_line, = a_line.splitlines()
b_line, = b_line.splitlines()
# output the combined content to AB.txt
print(f"{a_line}\t{b_line}", file=ab_file)
(请注意,这依赖于 zip
执行 "lazy" 并返回一个生成器,而不是像 Python 2 中那样完全读取文件并返回一个巨大的列表。)
如果 A.txt
中的所有行都具有相同的固定长度,则根本不需要任何分隔符。 (不过,为了在调试时保持理智,您可能仍想使用一个。)如果您不知道 A.txt
中不能出现的任何字符,您可以创建一个 csv.writer
and use its writerow
method 来将行写入 AB.txt
。它将为您处理所需的转义或引用。
您可能想知道步骤
discard what we've read so far from memory
已实施。这在这里隐式发生,因为保存文件数据的唯一变量 a_line
和 b_line
会在每次迭代时被覆盖。
正在订购组合文件
为了订购整个文件,我们必须将其完全加载到内存中,对吗?
没有。好吧,实际上是的,但同样,不是一次全部。我们可以使用 external sorting. You can try to implement this yourself in Python, or you can just use the UNIX command line tool sort
(manual page),它就是这样做的。在 Linux 或 macOS 系统上,您已经可以使用它。在 Windows 上,它应该包含在任何 UNIX 模拟器中,例如 Cygwin 或 MinGW。如果您已经使用 Windows 的默认 Git 安装程序安装了 Git,您可以使用随附的 "Git Bash".
sort
请注意,由于我们的内容在每一行中的顺序,文件将首先按来自 A.txt
的内容排序,然后(如果相同)按来自 B.txt
.
计数中
一旦你有了排序后的组合文件,你可以再次逐行处理它,但你必须在行之间保留 一些 数据。
我们要做的是:
- 对于具有相同 A 内容的后续行的每个块:
- 其中,对于具有相同 B 内容的后续行的每个块:
- 计算它的行数
- 统计哪些 B 内容(在 A 内容块内)的行数最多
- 在 A-content 块的末尾:输出包含 A-content 和该 A-content 最常见的 B-content 的一行
因为我们可以依赖上面强加的顺序,所以这将产生想要的结果。
像这样的东西应该可以工作:
- 读一行
- 将其拆分为 A 内容和 B 内容
- 如果A内容与上一行相同*:
- 如果B内容与上一行相同*:
- 增加当前 a-b 内容组合的计数器
- else(即,如果 B 内容与上一行不同):
- 存储到目前为止看到最多的 B 内容及其总数
- (要么是之前最常看到的B内容,要么是上一行的内容)
- 重置当前 a-b 内容组合的计数器
- 将计数器加一(对于当前行)
- 将 B 内容存储在某处,以便我们可以将其与下一次迭代中的下一行的内容进行比较
- 存储到目前为止看到最多的 B 内容及其总数
- 如果B内容与上一行相同*:
- else(即,如果 A 内容与上一行不同)**:
- 输出上一行的A内容和上一行出现次数最多的B内容
- 重置当前 a-b 内容组合的计数器
- 重置关于什么是最受关注的 B 内容及其计数的信息
- 存储当前行的A-content和B-content,以便在下一次迭代中将它们与下一行的内容进行比较
- 重复直到处理完整个文件
* 对于第一行,这是隐含的 false
** 当您到达文件末尾时也执行此操作
实际执行此操作是在 Python 作为 reader 的练习者。请注意,您必须在上面的描述中提到的步骤之前定义一些使用的变量,以便它们具有正确的范围。
请注意,您还可以使用 Python 综合标准库的功能比此处描述的更巧妙地执行计数步骤。请参阅
基于groupby
:
from itertools import groupby
from operator import itemgetter
# Combine and sort, in reality done like das-g described.
A = 'ABCDACADAEF'
B = '16781918216'
combined_and_sorted = sorted(zip(A, B))
# Count and produce the results
for a, agroup in groupby(combined_and_sorted, itemgetter(0)):
bcounts = ((b, sum(1 for _ in group))
for b, group in groupby(agroup, itemgetter(1)))
print(a, max(bcounts, key=itemgetter(1))[0])
A 1
B 6
C 7
D 8
E 1
F 6
正如我在评论中提到的,我认为使用 shelve
模块可以让你做你想做的事。
这是一个示例实现及其输出。 注意 我在 A.txt
和 B.txt
的末尾添加了四行,以确保在不是第一个遇到的值时选择了正确的值。 另请注意 我在开发和调试代码中留下了您肯定希望在 运行 非常大的输入文件上删除或禁用它。
作为 table,两个输入文件如下所示:
1 A 1
2 B 6
3 C 7
4 D 8
5 A 1
6 C 9
7 A 1
8 D 8
9 A 2
10 E 1
11 F 6
12 G 5
13 G 7
15 G 7
16 G 7
并且,再次作为 table,如果重写这两个输出文件,它们将是这样的:
1 A 1
2 B 6
3 C 7
4 D 8
5 E 1
6 F 6
7 G 7
import glob
from operator import itemgetter, methodcaller
import os
import shelve
shelf_name = 'temp_shelf'
file_a_name = 'A.txt'
file_b_name = 'B.txt'
with open(file_a_name) as file_a, \
open(file_b_name) as file_b, \
shelve.open(shelf_name, flag='n', writeback=False) as shelf:
for line, items in enumerate(zip(file_a, file_b)):
x, fx = map(methodcaller('rstrip'), items) # Remove line endings.
d = shelf.setdefault(x, {}) # Each shelf entry is a regular dict.
d[fx] = d.setdefault(fx, 0) + 1 # Update its value.
shelf[x] = d # Update shelf.
print('{!r} -> {!r}'.format(x, shelf[x]))
# Show what ended up in shelf during development.
print('\nFinal contents of shelf:')
for k, d in shelf.items():
print(' {!r}: {!r}'.format(k, d))
# Change file names to prevent overwriting originals during development.
file_a_name = 'A_updated.txt'
file_b_name = 'B_updated.txt'
comp_name = 'composite.txt' # Used for development only.
# Update files leaving only most frequent combination.
with open(file_a_name, 'w') as file_a, \
open(file_b_name, 'w') as file_b, \
open(comp_name, 'w') as file_c, \
shelve.open(shelf_name, flag='r') as shelf:
for line, (k, d) in enumerate(shelf.items(), 1):
file_a.write(k + '\n')
fx = sorted(d.items(), # Rev sort by number of occurrences.
key=itemgetter(1), reverse=True)[0][0] # Most common fx.
file_b.write(fx + '\n')
file_c.write('{} {} {}\n'.format(line, k, fx)) # For development only.
# CLean up by removing shelf files.
for filename in glob.glob(shelf_name + '.*'):
print('removing', filename)
os.remove(filename)
print('\nDone')