Python:在字符串列表中优化搜索子字符串
Python: optimal search for substring in list of strings
我有一个特殊的问题,我想在一个包含多个字符串的列表中搜索多个子字符串。以下是我正在尝试做的事情的要点:
listStrings = [ACDE, CDDE, BPLL, ... ]
listSubstrings = [ACD, BPI, KLJ, ...]
以上条目只是示例。 len(listStrings) 约为 60,000,len(listSubstrings) 约为 50,000-300,000,而 len(listStrings[i]) 介于 10 到 30,000 之间。
我目前的 Python 尝试是:
for i in listSubstrings:
for j in listStrings:
if i in j:
w.write(i+j)
或者类似的东西。虽然这适用于我的任务,但速度非常慢,使用一个内核并需要大约 40 分钟才能完成任务。有没有办法加快速度?
我不相信我可以从 listStrings:listSubstrings 中生成字典,因为有可能需要在两端存储重复的条目(尽管如果我能找到一个为每个标签附加一个唯一标签的方法,因为字典要快得多)。同样,我认为我无法预先计算可能的子字符串。我什至不知道搜索字典键是否比搜索列表更快(因为 dict.get()
将提供特定输入而不是查找子输入)。相对来说在内存中搜索列表就那么慢吗?
您可以使用 built-in 列表函数来加快速度。
for i in listSubstrings:
w.write(list(map(lambda j: i + j, list(lambda j: i in j,listStrings))))
从 运行 时间复杂度分析来看,您的最坏情况似乎是 n^2 次比较,因为根据您当前的问题结构,您需要遍历每个列表。您需要担心的另一个问题是内存消耗,因为随着规模的扩大,更多的内存通常是 bottle-neck.
如您所说,您可能想要索引字符串列表。我们可以知道子字符串列表或字符串列表是否有任何模式?例如,在您的示例中,我们可以索引哪些字符串具有字母表中的哪些字符 {"A": ["ABC", "BAW", "CMAI"]...} 从而我们不需要每次都为每个子字符串元素列表遍历字符串列表。
你的子串都是一样长的吗?您的示例使用 3 个字母的子字符串。在这种情况下,您可以创建一个包含 3 个字母的子字符串作为字符串列表键的字典:
index = {}
for string in listStrings:
for i in range(len(string)-2):
substring = string[i:i+3]
index_strings = index.get(substring, [])
index_strings.append(string)
index[substring] = index_strings
for substring in listSubstrings:
index_strings = index.get(substring, [])
for string in index_strings:
w.write(substring+string)
也许您可以尝试将两个列表之一(最大的?虽然直觉上我会削减 listStrings
)分成较小的列表,然后使用线程 运行 这些并行搜索(Pool
class of multiprocessing
提供了一种方便的方法来做到这一点)?我有一些重要的 speed-up 使用类似的东西:
from multiprocessing import Pool
from itertools import chain, islice
# The function to be run in parallel :
def my_func(strings):
return [j+i for i in strings for j in listSubstrings if i.find(j)>-1]
# A small recipe from itertools to chunk an iterable :
def chunk(it, size):
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
# Generating some fake & random value :
from random import randint
listStrings = \
[''.join([chr(randint(65, 90)) for i in range(randint(1, 500))]) for j in range(10000)]
listSubstrings = \
[''.join([chr(randint(65, 90)) for i in range(randint(1, 100))]) for j in range(1000)]
# You have to prepare the searches to be performed:
prep = [strings for strings in chunk(listStrings, round(len(listStrings) / 8))]
with Pool(4) as mp_pool:
# multiprocessing.map is a parallel version of map()
res = mp_pool.map(my_func, prep)
# The `res` variable is a list of list, so now you concatenate them
# in order to have a flat result list
result = list(chain.from_iterable(res))
然后你可以编写整个 result
变量(而不是逐行编写):
with open('result_file', 'w') as f:
f.write('\n'.join(result))
编辑 2018 年 1 月 5 日:按照 ShadowRanger 的建议,使用 itertools.chain.from_iterable
而不是使用 map
side-effects 的丑陋解决方法来展平结果。
对于您正在尝试的事情(在一大堆其他字符串中搜索一组固定的一整串字符串),并行化和微小的调整不会有太大帮助。您需要改进算法。
首先,我建议使用 Aho-Corasick string matching algorithm。基本上,作为从固定字符串集构建匹配器对象的一些预计算工作的交换,您可以一次性扫描另一个字符串以查找这些固定字符串的 all。
因此,与其每次扫描 60K 个字符串 50K+ 次(30 亿次扫描?!?),您可以每次扫描一次,成本仅比普通单次扫描略高,并获得所有匹配项。
最好的部分是,您不是自己编写的。 PyPI(Python 包索引)已经为您编写了 pyahocorasick
包。所以试试吧。
使用示例:
import ahocorasick
listStrings = [ACDE, CDDE, BPLL, ...]
listSubstrings = [ACD, BPI, KLJ, ...]
auto = ahocorasick.Automaton()
for substr in listSubstrings:
auto.add_word(substr, substr)
auto.make_automaton()
...
for astr in listStrings:
for end_ind, found in auto.iter(astr):
w.write(found+astr)
如果在被搜索的字符串 ("haystack") 中多次找到子字符串 ("needle"),这将 write
多次。您可以通过使用 set
到去重来更改循环,使其仅在给定干草堆中给定针的第一次命中时 write
:
for astr in listStrings:
seen = set()
for end_ind, found in auto.iter(astr):
if found not in seen:
seen.add(found)
w.write(found+astr)
您可以进一步调整它,通过将单词的索引存储为或与它们的值,因此您可以对匹配项进行排序(可能是小数字,因此排序开销很小):
from future_builtins import map # Only on Py2, for more efficient generator based map
from itertools import groupby
from operator import itemgetter
auto = ahocorasick.Automaton()
for i, substr in enumerate(listSubstrings):
# Store index and substr so we can recover original ordering
auto.add_word(substr, (i, substr))
auto.make_automaton()
...
for astr in listStrings:
# Gets all hits, sorting by the index in listSubstrings, so we output hits
# in the same order we theoretically searched for them
allfound = sorted(map(itemgetter(1), auto.iter(astr)))
# Using groupby dedups already sorted inputs cheaply; the map throws away
# the index since we don't need it
for found, _ in groupby(map(itemgetter(1), allfound)):
w.write(found+astr)
为了进行性能比较,我在 mgc 的答案中使用了一个变体,该变体更有可能包含匹配项,并且扩大了大海捞针。一、设置代码:
>>> from random import choice, randint
>>> from string import ascii_uppercase as uppercase
>>> # 5000 haystacks, each 1000-5000 characters long
>>> listStrings = [''.join([choice(uppercase) for i in range(randint(1000, 5000))]) for j in range(5000)]
>>> # ~1000 needles (might be slightly less for dups), each 3-12 characters long
>>> listSubstrings = tuple({''.join([choice(uppercase) for i in range(randint(3, 12))]) for j in range(1000)})
>>> auto = ahocorasick.Automaton()
>>> for needle in listSubstrings:
... auto.add_word(needle, needle)
...
>>> auto.make_automaton()
现在开始实际测试(使用 ipython
%timeit
微基准测试魔法):
>>> sum(needle in haystack for haystack in listStrings for needle in listSubstrings)
80279 # Will differ depending on random seed
>>> sum(len(set(map(itemgetter(1), auto.iter(haystack)))) for haystack in listStrings)
80279 # Same behavior after uniquifying results
>>> %timeit -r5 sum(needle in haystack for haystack in listStrings for needle in listSubstrings)
1 loops, best of 5: 9.79 s per loop
>>> %timeit -r5 sum(len(set(map(itemgetter(1), auto.iter(haystack)))) for haystack in listStrings)
1 loops, best of 5: 460 ms per loop
因此,为了在 5000 个中等大小的字符串中的每一个中检查 ~1000 个较小的字符串,pyahocorasick
在我的机器上以 ~21 倍的系数击败了个人成员资格测试。随着 listSubstrings
的大小也增加,它的扩展性也很好;当我以相同的方式初始化它时,但使用 10,000 个较小的字符串而不是 1000 个时,所需的总时间从 ~460 毫秒增加到 ~852 毫秒,这是一个 1.85 倍的时间乘数,可以执行 10 倍的逻辑搜索。
郑重声明,在这种情况下,构建自动机的时间微不足道。您预先支付一次,而不是每个干草堆一次,并且测试显示约 1000 个字符串自动机花费约 1.4 毫秒来构建并占用约 277 KB 的内存(超出字符串本身); ~10000 个字符串自动机花费了~21 毫秒来构建,并占用了~2.45 MB 的内存。
您可以通过将 listString 连接成一个长字符串来显着加快内部循环(或者从文件中读取字符串而不在换行符处拆分它)。
with open('./testStrings.txt') as f:
longString = f.read() # string with seqs separated by \n
with open('./testSubstrings.txt') as f:
listSubstrings = list(f)
def search(longString, listSubstrings):
for n, substring in enumerate(listSubstrings):
offset = longString.find(substring)
while offset >= 0:
yield (substring, offset)
offset = longString.find(substring, offset + 1)
matches = list(search(longString, listSubstrings))
偏移量可以映射回字符串索引。
from bisect import bisect_left
breaks = [n for n,c in enumerate(longString) if c=='\n']
for substring, offset in matches:
stringindex = bisect_left(breaks, offset)
我的测试显示与嵌套 for 循环相比速度提高了 7 倍(11 秒对 77 秒)。
我有一个特殊的问题,我想在一个包含多个字符串的列表中搜索多个子字符串。以下是我正在尝试做的事情的要点:
listStrings = [ACDE, CDDE, BPLL, ... ]
listSubstrings = [ACD, BPI, KLJ, ...]
以上条目只是示例。 len(listStrings) 约为 60,000,len(listSubstrings) 约为 50,000-300,000,而 len(listStrings[i]) 介于 10 到 30,000 之间。
我目前的 Python 尝试是:
for i in listSubstrings:
for j in listStrings:
if i in j:
w.write(i+j)
或者类似的东西。虽然这适用于我的任务,但速度非常慢,使用一个内核并需要大约 40 分钟才能完成任务。有没有办法加快速度?
我不相信我可以从 listStrings:listSubstrings 中生成字典,因为有可能需要在两端存储重复的条目(尽管如果我能找到一个为每个标签附加一个唯一标签的方法,因为字典要快得多)。同样,我认为我无法预先计算可能的子字符串。我什至不知道搜索字典键是否比搜索列表更快(因为 dict.get()
将提供特定输入而不是查找子输入)。相对来说在内存中搜索列表就那么慢吗?
您可以使用 built-in 列表函数来加快速度。
for i in listSubstrings:
w.write(list(map(lambda j: i + j, list(lambda j: i in j,listStrings))))
从 运行 时间复杂度分析来看,您的最坏情况似乎是 n^2 次比较,因为根据您当前的问题结构,您需要遍历每个列表。您需要担心的另一个问题是内存消耗,因为随着规模的扩大,更多的内存通常是 bottle-neck.
如您所说,您可能想要索引字符串列表。我们可以知道子字符串列表或字符串列表是否有任何模式?例如,在您的示例中,我们可以索引哪些字符串具有字母表中的哪些字符 {"A": ["ABC", "BAW", "CMAI"]...} 从而我们不需要每次都为每个子字符串元素列表遍历字符串列表。
你的子串都是一样长的吗?您的示例使用 3 个字母的子字符串。在这种情况下,您可以创建一个包含 3 个字母的子字符串作为字符串列表键的字典:
index = {}
for string in listStrings:
for i in range(len(string)-2):
substring = string[i:i+3]
index_strings = index.get(substring, [])
index_strings.append(string)
index[substring] = index_strings
for substring in listSubstrings:
index_strings = index.get(substring, [])
for string in index_strings:
w.write(substring+string)
也许您可以尝试将两个列表之一(最大的?虽然直觉上我会削减 listStrings
)分成较小的列表,然后使用线程 运行 这些并行搜索(Pool
class of multiprocessing
提供了一种方便的方法来做到这一点)?我有一些重要的 speed-up 使用类似的东西:
from multiprocessing import Pool
from itertools import chain, islice
# The function to be run in parallel :
def my_func(strings):
return [j+i for i in strings for j in listSubstrings if i.find(j)>-1]
# A small recipe from itertools to chunk an iterable :
def chunk(it, size):
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
# Generating some fake & random value :
from random import randint
listStrings = \
[''.join([chr(randint(65, 90)) for i in range(randint(1, 500))]) for j in range(10000)]
listSubstrings = \
[''.join([chr(randint(65, 90)) for i in range(randint(1, 100))]) for j in range(1000)]
# You have to prepare the searches to be performed:
prep = [strings for strings in chunk(listStrings, round(len(listStrings) / 8))]
with Pool(4) as mp_pool:
# multiprocessing.map is a parallel version of map()
res = mp_pool.map(my_func, prep)
# The `res` variable is a list of list, so now you concatenate them
# in order to have a flat result list
result = list(chain.from_iterable(res))
然后你可以编写整个 result
变量(而不是逐行编写):
with open('result_file', 'w') as f:
f.write('\n'.join(result))
编辑 2018 年 1 月 5 日:按照 ShadowRanger 的建议,使用 itertools.chain.from_iterable
而不是使用 map
side-effects 的丑陋解决方法来展平结果。
对于您正在尝试的事情(在一大堆其他字符串中搜索一组固定的一整串字符串),并行化和微小的调整不会有太大帮助。您需要改进算法。
首先,我建议使用 Aho-Corasick string matching algorithm。基本上,作为从固定字符串集构建匹配器对象的一些预计算工作的交换,您可以一次性扫描另一个字符串以查找这些固定字符串的 all。
因此,与其每次扫描 60K 个字符串 50K+ 次(30 亿次扫描?!?),您可以每次扫描一次,成本仅比普通单次扫描略高,并获得所有匹配项。
最好的部分是,您不是自己编写的。 PyPI(Python 包索引)已经为您编写了 pyahocorasick
包。所以试试吧。
使用示例:
import ahocorasick
listStrings = [ACDE, CDDE, BPLL, ...]
listSubstrings = [ACD, BPI, KLJ, ...]
auto = ahocorasick.Automaton()
for substr in listSubstrings:
auto.add_word(substr, substr)
auto.make_automaton()
...
for astr in listStrings:
for end_ind, found in auto.iter(astr):
w.write(found+astr)
如果在被搜索的字符串 ("haystack") 中多次找到子字符串 ("needle"),这将 write
多次。您可以通过使用 set
到去重来更改循环,使其仅在给定干草堆中给定针的第一次命中时 write
:
for astr in listStrings:
seen = set()
for end_ind, found in auto.iter(astr):
if found not in seen:
seen.add(found)
w.write(found+astr)
您可以进一步调整它,通过将单词的索引存储为或与它们的值,因此您可以对匹配项进行排序(可能是小数字,因此排序开销很小):
from future_builtins import map # Only on Py2, for more efficient generator based map
from itertools import groupby
from operator import itemgetter
auto = ahocorasick.Automaton()
for i, substr in enumerate(listSubstrings):
# Store index and substr so we can recover original ordering
auto.add_word(substr, (i, substr))
auto.make_automaton()
...
for astr in listStrings:
# Gets all hits, sorting by the index in listSubstrings, so we output hits
# in the same order we theoretically searched for them
allfound = sorted(map(itemgetter(1), auto.iter(astr)))
# Using groupby dedups already sorted inputs cheaply; the map throws away
# the index since we don't need it
for found, _ in groupby(map(itemgetter(1), allfound)):
w.write(found+astr)
为了进行性能比较,我在 mgc 的答案中使用了一个变体,该变体更有可能包含匹配项,并且扩大了大海捞针。一、设置代码:
>>> from random import choice, randint
>>> from string import ascii_uppercase as uppercase
>>> # 5000 haystacks, each 1000-5000 characters long
>>> listStrings = [''.join([choice(uppercase) for i in range(randint(1000, 5000))]) for j in range(5000)]
>>> # ~1000 needles (might be slightly less for dups), each 3-12 characters long
>>> listSubstrings = tuple({''.join([choice(uppercase) for i in range(randint(3, 12))]) for j in range(1000)})
>>> auto = ahocorasick.Automaton()
>>> for needle in listSubstrings:
... auto.add_word(needle, needle)
...
>>> auto.make_automaton()
现在开始实际测试(使用 ipython
%timeit
微基准测试魔法):
>>> sum(needle in haystack for haystack in listStrings for needle in listSubstrings)
80279 # Will differ depending on random seed
>>> sum(len(set(map(itemgetter(1), auto.iter(haystack)))) for haystack in listStrings)
80279 # Same behavior after uniquifying results
>>> %timeit -r5 sum(needle in haystack for haystack in listStrings for needle in listSubstrings)
1 loops, best of 5: 9.79 s per loop
>>> %timeit -r5 sum(len(set(map(itemgetter(1), auto.iter(haystack)))) for haystack in listStrings)
1 loops, best of 5: 460 ms per loop
因此,为了在 5000 个中等大小的字符串中的每一个中检查 ~1000 个较小的字符串,pyahocorasick
在我的机器上以 ~21 倍的系数击败了个人成员资格测试。随着 listSubstrings
的大小也增加,它的扩展性也很好;当我以相同的方式初始化它时,但使用 10,000 个较小的字符串而不是 1000 个时,所需的总时间从 ~460 毫秒增加到 ~852 毫秒,这是一个 1.85 倍的时间乘数,可以执行 10 倍的逻辑搜索。
郑重声明,在这种情况下,构建自动机的时间微不足道。您预先支付一次,而不是每个干草堆一次,并且测试显示约 1000 个字符串自动机花费约 1.4 毫秒来构建并占用约 277 KB 的内存(超出字符串本身); ~10000 个字符串自动机花费了~21 毫秒来构建,并占用了~2.45 MB 的内存。
您可以通过将 listString 连接成一个长字符串来显着加快内部循环(或者从文件中读取字符串而不在换行符处拆分它)。
with open('./testStrings.txt') as f:
longString = f.read() # string with seqs separated by \n
with open('./testSubstrings.txt') as f:
listSubstrings = list(f)
def search(longString, listSubstrings):
for n, substring in enumerate(listSubstrings):
offset = longString.find(substring)
while offset >= 0:
yield (substring, offset)
offset = longString.find(substring, offset + 1)
matches = list(search(longString, listSubstrings))
偏移量可以映射回字符串索引。
from bisect import bisect_left
breaks = [n for n,c in enumerate(longString) if c=='\n']
for substring, offset in matches:
stringindex = bisect_left(breaks, offset)
我的测试显示与嵌套 for 循环相比速度提高了 7 倍(11 秒对 77 秒)。