Python:在字符串列表中优化搜索子字符串

Python: optimal search for substring in list of strings

我有一个特殊的问题,我想在一个包含多个字符串的列表中搜索多个子字符串。以下是我正在尝试做的事情的要点:

listStrings = [ACDE, CDDE, BPLL, ... ]

listSubstrings = [ACD, BPI, KLJ, ...]

以上条目只是示例。 len(listStrings) 约为 60,000,len(listSubstrings) 约为 50,000-300,000,而 len(listStrings[i]) 介于 10 到 30,000 之间。

我目前的 Python 尝试是:

for i in listSubstrings:
   for j in listStrings:
       if i in j:
          w.write(i+j)

或者类似的东西。虽然这适用于我的任务,但速度非常慢,使用一个内核并需要大约 40 分钟才能完成任务。有没有办法加快速度?

我不相信我可以从 listStrings:listSubstrings 中生成字典,因为有可能需要在两端存储重复的条目(尽管如果我能找到一个为每个标签附加一个唯一标签的方法,因为字典要快得多)。同样,我认为我无法预先计算可能的子字符串。我什至不知道搜索字典键是否比搜索列表更快(因为 dict.get() 将提供特定输入而不是查找子输入)。相对来说在内存中搜索列表就那么慢吗?

您可以使用 built-in 列表函数来加快速度。

for i in listSubstrings:
   w.write(list(map(lambda j: i + j, list(lambda j: i in j,listStrings))))

从 运行 时间复杂度分析来看,您的最坏情况似乎是 n^2 次比较,因为根据您当前的问题结构,您需要遍历每个列表。您需要担心的另一个问题是内存消耗,因为随着规模的扩大,更多的内存通常是 bottle-neck.

如您所说,您可能想要索引字符串列表。我们可以知道子字符串列表或字符串列表是否有任何模式?例如,在您的示例中,我们可以索引哪些字符串具有字母表中的哪些字符 {"A": ["ABC", "BAW", "CMAI"]...} 从而我们不需要每次都为每个子字符串元素列表遍历字符串列表。

你的子串都是一样长的吗?您的示例使用 3 个字母的子字符串。在这种情况下,您可以创建一个包含 3 个字母的子字符串作为字符串列表键的字典:

index = {}
for string in listStrings:
    for i in range(len(string)-2):
        substring = string[i:i+3]
        index_strings = index.get(substring, [])
        index_strings.append(string)
        index[substring] = index_strings

for substring in listSubstrings:
    index_strings = index.get(substring, [])
    for string in index_strings:
        w.write(substring+string)

也许您可以尝试将两个列表之一(最大的?虽然直觉上我会削减 listStrings)分成较小的列表,然后使用线程 运行 这些并行搜索(Pool class of multiprocessing 提供了一种方便的方法来做到这一点)?我有一些重要的 speed-up 使用类似的东西:

from multiprocessing import Pool
from itertools import chain, islice

# The function to be run in parallel :
def my_func(strings):
    return [j+i for i in strings for j in listSubstrings if i.find(j)>-1]

# A small recipe from itertools to chunk an iterable :
def chunk(it, size):
    it = iter(it)
    return iter(lambda: tuple(islice(it, size)), ())

# Generating some fake & random value :
from random import randint
listStrings = \
    [''.join([chr(randint(65, 90)) for i in range(randint(1, 500))]) for j in range(10000)]
listSubstrings = \
    [''.join([chr(randint(65, 90)) for i in range(randint(1, 100))]) for j in range(1000)]

# You have to prepare the searches to be performed:
prep = [strings for strings in chunk(listStrings, round(len(listStrings) / 8))]
with Pool(4) as mp_pool:
    # multiprocessing.map is a parallel version of map()
    res = mp_pool.map(my_func, prep)
# The `res` variable is a list of list, so now you concatenate them
# in order to have a flat result list
result = list(chain.from_iterable(res))

然后你可以编写整个 result 变量(而不是逐行编写):

with open('result_file', 'w') as f:
    f.write('\n'.join(result))

编辑 2018 年 1 月 5 日:按照 ShadowRanger 的建议,使用 itertools.chain.from_iterable 而不是使用 map side-effects 的丑陋解决方法来展平结果。

对于您正在尝试的事情(在一大堆其他字符串中搜索一组固定的一整串字符串),并行化和微小的调整不会有太大帮助。您需要改进算法。

首先,我建议使用 Aho-Corasick string matching algorithm。基本上,作为从固定字符串集构建匹配器对象的一些预计算工作的交换,您可以一次性扫描另一个字符串以查找这些固定字符串的 all

因此,与其每次扫描 60K 个字符串 50K+ 次(30 亿次扫描?!?),您可以每次扫描一次,成本仅比普通单次扫描略高,并获得所有匹配项。

最好的部分是,您不是自己编写的。 PyPI(Python 包索引)已经为您编写了 pyahocorasick 包。所以试试吧。

使用示例:

import ahocorasick

listStrings = [ACDE, CDDE, BPLL, ...]
listSubstrings = [ACD, BPI, KLJ, ...]

auto = ahocorasick.Automaton()
for substr in listSubstrings:
    auto.add_word(substr, substr)
auto.make_automaton()

...

for astr in listStrings:
    for end_ind, found in auto.iter(astr):
        w.write(found+astr)

如果在被搜索的字符串 ("haystack") 中多次找到子字符串 ("needle"),这将 write 多次。您可以通过使用 set 到去重来更改循环,使其仅在给定干草堆中给定针的第一次命中时 write

for astr in listStrings:
    seen = set()
    for end_ind, found in auto.iter(astr):
        if found not in seen:
            seen.add(found)
            w.write(found+astr)

您可以进一步调整它,通过将单词的索引存储为或与它们的值,因此您可以对匹配项进行排序(可能是小数字,因此排序开销很小):

from future_builtins import map  # Only on Py2, for more efficient generator based map
from itertools import groupby
from operator import itemgetter

auto = ahocorasick.Automaton()
for i, substr in enumerate(listSubstrings):
    # Store index and substr so we can recover original ordering
    auto.add_word(substr, (i, substr))
auto.make_automaton()

...

for astr in listStrings:
    # Gets all hits, sorting by the index in listSubstrings, so we output hits
    # in the same order we theoretically searched for them
    allfound = sorted(map(itemgetter(1), auto.iter(astr)))
    # Using groupby dedups already sorted inputs cheaply; the map throws away
    # the index since we don't need it
    for found, _ in groupby(map(itemgetter(1), allfound)):
        w.write(found+astr)

为了进行性能比较,我在 mgc 的答案中使用了一个变体,该变体更有可能包含匹配项,并且扩大了大海捞针。一、设置代码:

>>> from random import choice, randint
>>> from string import ascii_uppercase as uppercase
>>> # 5000 haystacks, each 1000-5000 characters long
>>> listStrings = [''.join([choice(uppercase) for i in range(randint(1000, 5000))]) for j in range(5000)]
>>> # ~1000 needles (might be slightly less for dups), each 3-12 characters long
>>> listSubstrings = tuple({''.join([choice(uppercase) for i in range(randint(3, 12))]) for j in range(1000)})
>>> auto = ahocorasick.Automaton()
>>> for needle in listSubstrings:
...     auto.add_word(needle, needle)
...
>>> auto.make_automaton()

现在开始实际测试(使用 ipython %timeit 微基准测试魔法):

>>> sum(needle in haystack for haystack in listStrings for needle in listSubstrings)
80279  # Will differ depending on random seed
>>> sum(len(set(map(itemgetter(1), auto.iter(haystack)))) for haystack in listStrings)
80279  # Same behavior after uniquifying results
>>> %timeit -r5 sum(needle in haystack for haystack in listStrings for needle in listSubstrings)
1 loops, best of 5: 9.79 s per loop
>>> %timeit -r5 sum(len(set(map(itemgetter(1), auto.iter(haystack)))) for haystack in listStrings)
1 loops, best of 5: 460 ms per loop

因此,为了在 5000 个中等大小的字符串中的每一个中检查 ~1000 个较小的字符串,pyahocorasick 在我的机器上以 ~21 倍的系数击败了个人成员资格测试。随着 listSubstrings 的大小也增加,它的扩展性也很好;当我以相同的方式初始化它时,但使用 10,000 个较小的字符串而不是 1000 个时,所需的总时间从 ~460 毫秒增加到 ~852 毫秒,这是一个 1.85 倍的时间乘数,可以执行 10 倍的逻辑搜索。

郑重声明,在这种情况下,构建自动机的时间微不足道。您预先支付一次,而不是每个干草堆一次,并且测试显示约 1000 个字符串自动机花费约 1.4 毫秒来构建并占用约 277 KB 的内存(超出字符串本身); ~10000 个字符串自动机花费了~21 毫秒来构建,并占用了~2.45 MB 的内存。

您可以通过将 listString 连接成一个长字符串来显着加快内部循环(或者从文件中读取字符串而不在换行符处拆分它)。

with open('./testStrings.txt') as f:
    longString = f.read()               # string with seqs separated by \n

with open('./testSubstrings.txt') as f:
    listSubstrings = list(f)

def search(longString, listSubstrings):
    for n, substring in enumerate(listSubstrings):
        offset = longString.find(substring)
        while offset >= 0:
            yield (substring, offset)
            offset = longString.find(substring, offset + 1)

matches = list(search(longString, listSubstrings))

偏移量可以映射回字符串索引。

from bisect import bisect_left
breaks = [n for n,c in enumerate(longString) if c=='\n']

for substring, offset in matches:
    stringindex = bisect_left(breaks, offset)

我的测试显示与嵌套 for 循环相比速度提高了 7 倍(11 秒对 77 秒)。