如何按出现次数删除两个大文本文件中的重复行?

How to remove duplicate lines in two large text files by number of appearance?

我有两个行数相同的大文本文件(900 万行,每个文件大约 12 GB)。所以它们无法加载到内存中。

table 中显示的这些文本文件中的行如下所示:

我需要删除 A.txt[=39 中的重复项=] 并且只为来自 A.txt 的每一行保留最频繁的组合。如果有 2 个最频繁的行且重复次数相同,程序应选择最先出现在文本中的行并删除所有其他行。

在实际文件中,行不仅仅是 (A,B,C,D,...1,6,7,..),每行大约有 2000 个字符。

table 中显示的最终文本文件应如下所示:

此选项不需要内存中的整个文件,但需要保留一个以 A 为键的字典和多个以 B 为键的字典。如果您可以对值进行散列或分类(为每个唯一的 A 和每个唯一的 B 分配一个 int 值),这可以简化。

编辑:更改字典以使用散列键减少内存占用,但代价是 CPU 并更改输出以显示要保留的行(因为原始 A 和 B 值被混淆)

假设我的文件是:

Lines,A.txt,B.txt
1,A,1
2,A,1
3,A,2
4,B,1
5,B,2
from collections import Counter
from csv import DictReader
_ = {}
_file = DictReader(open('abc.txt', 'r'), delimiter=',')

hash_to_line = {}
for row in _file:
    a = hash(row['A.txt'])
    b = hash(row['B.txt'])
    if a not in _:
        _[a] = Counter()
    if b not in _[a]:
        hash_to_line[(a, b)] = row['Lines']
    _[a][b] += 1

output = []
for A in _:
    _vals = list(_[A].values())
    _keys = list(_[A].keys())
    _max = max(_vals)

    _vals.index(_max)
    A, _[A][_keys[_vals.index(_max)]]
    output.append(hash_to_line[A, _keys[_vals.index(_max)]])
print('lines to keep:', output)

用适当的结果存储替换打印。

如何避免一次将 2 × 12 GB 读入内存,但仍处理所有数据?

通过逐块加载这 24 GB 并丢弃您不再需要的数据。由于您的文件是基于行的,因此逐行阅读和处理似乎是明智的。一次在内存中存储 4000 个字符应该不会对现代个人计算机造成问题。

合并文件

您希望最终结果按 A.txt 的行内容排序(甚至排序)。为了不丢失A.txtB.txt行之间的关系,我们需要先合并它们的内容。

  • 打开两个文件但尚未阅读它们
  • 正在打开一个新文件AB.txt 进行写入
  • 重复以下操作,直到处理完所有 A.txtB.txt
    • 读一行A.txt
    • 读一行B.txt
    • 将组合内容作为新行附加到 AB.txt
    • 丢弃我们目前从记忆中读到的内容

如果您知道某个字符(比如 '\t')不能出现在 A.txt 中,您可以使用它作为分隔符:

with \
        open("A.txt") as a_file, \
        open("B.txt") as b_file, \
        open("AB.txt", "w") as ab_file:
    for a_line, b_line in zip(a_file, b_file):
        # get rid of the line endings, whatever they are
        a_line, = a_line.splitlines()
        b_line, = b_line.splitlines()

        # output the combined content to AB.txt
        print(f"{a_line}\t{b_line}", file=ab_file)

(请注意,这依赖于 zip 执行 "lazy" 并返回一个生成器,而不是像 Python 2 中那样完全读取文件并返回一个巨大的列表。)

如果 A.txt 中的所有行都具有相同的固定长度,则根本不需要任何分隔符。 (不过,为了在调试时保持理智,您可能仍想使用一个。)如果您不知道 A.txt 中不能出现的任何字符,您可以创建一个 csv.writer and use its writerow method 来将行写入 AB.txt。它将为您处理所需的转义或引用。

您可能想知道步骤

discard what we've read so far from memory

已实施。这在这里隐式发生,因为保存文件数据的唯一变量 a_lineb_line 会在每次迭代时被覆盖。

正在订购组合文件

为了订购整个文件,我们必须将其完全加载到内存中,对吗?

没有。好吧,实际上是的,但同样,不是一次全部。我们可以使用 external sorting. You can try to implement this yourself in Python, or you can just use the UNIX command line tool sort (manual page),它就是这样做的。在 Linux 或 macOS 系统上,您已经可以使用它。在 Windows 上,它应该包含在任何 UNIX 模拟器中,例如 Cygwin 或 MinGW。如果您已经使用 Windows 的默认 Git 安装程序安装了 Git,您可以使用随附的 "Git Bash".

中的 UNIX sort

请注意,由于我们的内容在每一行中的顺序,文件将首先按来自 A.txt 的内容排序,然后(如果相同)按来自 B.txt.

计数中

一旦你有了排序后的组合文件,你可以再次逐行处理它,但你必须在行之间保留 一些 数据。

我们要做的是:

  • 对于具有相同 A 内容的后续行的每个块:
    • 其中,对于具有相同 B 内容的后续行的每个块:
    • 计算它的行​​数
    • 统计哪些 B 内容(在 A 内容块内)的行数最多
    • 在 A-content 块的末尾:输出包含 A-content 和该 A-content 最常见的 B-content 的一行

因为我们可以依赖上面强加的顺序,所以这将产生想要的结果。

像这样的东西应该可以工作:

  • 读一行
  • 将其拆分为 A 内容和 B 内容
  • 如果A内容与上一行相同*:
    • 如果B内容与上一行相同*:
      • 增加当前 a-b 内容组合的计数器
    • else(即,如果 B 内容与上一行不同):
      • 存储到目前为止看到最多的 B 内容及其总数
        • (要么是之前最常看到的B内容,要么是上一行的内容)
      • 重置当前 a-b 内容组合的计数器
      • 将计数器加一(对于当前行)
      • 将 B 内容存储在某处,以便我们可以将其与下一次迭代中的下一行的内容进行比较
  • else(即,如果 A 内容与上一行不同)**:
    • 输出上一行的A内容和上一行出现次数最多的B内容
    • 重置当前 a-b 内容组合的计数器
    • 重置关于什么是最受关注的 B 内容及其计数的信息
    • 存储当前行的A-content和B-content,以便在下一次迭代中将它们与下一行的内容进行比较
  • 重复直到处理完整个文件

* 对于第一行,这是隐含的 false

** 当您到达文件末尾时也执行此操作

实际执行此操作是在 Python 作为 reader 的练习者。请注意,您必须在上面的描述中提到的步骤之前定义一些使用的变量,以便它们具有正确的范围。

请注意,您还可以使用 Python 综合标准库的功能比此处描述的更巧妙地执行计数步骤。请参阅 了解一个很好的例子。

基于, counting the combined and sorted lines with nested groupby

from itertools import groupby
from operator import itemgetter

# Combine and sort, in reality done like das-g described.
A = 'ABCDACADAEF'
B = '16781918216'
combined_and_sorted = sorted(zip(A, B))

# Count and produce the results
for a, agroup in groupby(combined_and_sorted, itemgetter(0)):
    bcounts = ((b, sum(1 for _ in group))
               for b, group in groupby(agroup, itemgetter(1)))
    print(a, max(bcounts, key=itemgetter(1))[0])

Output:

A 1
B 6
C 7
D 8
E 1
F 6

正如我在评论中提到的,我认为使用 shelve 模块可以让你做你想做的事。

这是一个示例实现及其输出。 注意 我在 A.txtB.txt 的末尾添加了四行,以确保在不是第一个遇到的值时选择了正确的值。 另请注意 我在开发和调试代码中留下了您肯定希望在 运行 非常大的输入文件上删除或禁用它。

作为 table,两个输入文件如下所示:

 1 A 1
 2 B 6
 3 C 7
 4 D 8
 5 A 1
 6 C 9
 7 A 1
 8 D 8
 9 A 2
10 E 1
11 F 6
12 G 5
13 G 7
15 G 7
16 G 7

并且,再次作为 table,如果重写这两个输出文件,它们将是这样的:

1 A 1
2 B 6
3 C 7
4 D 8
5 E 1
6 F 6
7 G 7

import glob
from operator import itemgetter, methodcaller
import os
import shelve


shelf_name = 'temp_shelf'
file_a_name = 'A.txt'
file_b_name = 'B.txt'


with open(file_a_name) as file_a, \
     open(file_b_name) as file_b, \
     shelve.open(shelf_name, flag='n', writeback=False) as shelf:

    for line, items in enumerate(zip(file_a, file_b)):
        x, fx = map(methodcaller('rstrip'), items)  # Remove line endings.
        d = shelf.setdefault(x, {})      # Each shelf entry is a regular dict.
        d[fx] = d.setdefault(fx, 0) + 1  # Update its value.
        shelf[x] = d                     # Update shelf.

        print('{!r} -> {!r}'.format(x, shelf[x]))

    # Show what ended up in shelf during development.
    print('\nFinal contents of shelf:')
    for k, d in shelf.items():
        print('  {!r}: {!r}'.format(k, d))

# Change file names to prevent overwriting originals during development.
file_a_name = 'A_updated.txt'
file_b_name = 'B_updated.txt'
comp_name = 'composite.txt'  # Used for development only.

# Update files leaving only most frequent combination.
with open(file_a_name, 'w') as file_a, \
     open(file_b_name, 'w') as file_b, \
     open(comp_name, 'w') as file_c, \
     shelve.open(shelf_name, flag='r') as shelf:

    for line, (k, d) in enumerate(shelf.items(), 1):
        file_a.write(k + '\n')
        fx = sorted(d.items(),  # Rev sort by number of occurrences.
                    key=itemgetter(1), reverse=True)[0][0]  # Most common fx.
        file_b.write(fx + '\n')
        file_c.write('{} {} {}\n'.format(line, k, fx))  # For development only.

# CLean up by removing shelf files.
for filename in glob.glob(shelf_name + '.*'):
    print('removing', filename)
    os.remove(filename)

print('\nDone')