Spark:生成映射词到相似词列表——需要更好的性能
Spark: Generate Map Word to List of Similar Words - Need Better Performance
我正在处理 DNA 序列比对,但遇到性能问题。
我需要创建一个字典,将一个单词(一个设定长度的序列)映射到一个由单独的函数决定的所有相似单词的列表。
现在,我正在做以下事情:
all_words_rdd = sc.parallelize([''.join(word) for word in itertools.product(all_letters, repeat=WORD_SIZE)], PARALLELISM)
all_similar_word_pairs_map = (all_words_rdd.cartesian(all_words_rdd)
.filter(lambda (word1, word2), scoring_matrix=scoring_matrix, threshold_value=threshold_value: areWordsSimilar((word1, word2), scoring_matrix, threshold_value))
.groupByKey()
.mapValues(set)
.collectAsMap())
其中areWordsSimilar
显然是计算单词是否达到设定的相似度阈值。
但是,这太慢了。它适用于长度为 3 的单词,但一旦我再高一点,它就会呈指数级减慢(如您所料)。它还开始抱怨任务规模太大(同样,不足为奇)
我知道笛卡尔连接是执行此操作的一种非常低效的方法,但我不确定如何使用其他方法。
我想从这样的事情开始:
all_words_rdd = (sc.parallelize(xrange(0, len(all_letters) ** WORD_SIZE))
.repartition(PARALLELISM)
...
)
这样我就可以将计算拆分到多个节点。但是,我该如何计算呢?我正在考虑用基数做一些事情并使用模运算符推断字母(即在 len(all_letters)、num % 2 = all_letters[0]
、num % 3 = all_letters[1]
等的基数中)。
然而,这听起来非常复杂,所以我想知道是否有人有更好的方法。
提前致谢。
编辑
我知道我无法降低问题的指数复杂性,这不是我的目标。我的目标是通过让每个节点执行部分计算来分解跨多个执行节点的复杂性。但是,要做到这一点,我需要能够使用一些过程从数字中导出 DNA 词。
一般来说,即使没有 driver 辅助代码,它看起来也是一项无望的任务。序列集的大小呈指数级增长,你根本赢不了。根据您计划如何使用这些数据,很可能有更好的方法。
如果你仍然想这样做,你可以从 driver 和工人之间的 kmers 生成开始:
from itertools import product
def extend_kmer(n, kmer="", alphabet="ATGC"):
"""
>>> list(extend_kmer(2))[:4]
['AA', 'AT', 'AG', 'AC']
"""
tails = product(alphabet, repeat=n)
for tail in tails:
yield kmer + "".join(tail)
def generate_kmers(k, seed_size, alphabet="ATGC"):
"""
>>> kmers = generate_kmers(6, 3, "ATGC").collect()
>>> len(kmers)
4096
>>> sorted(kmers)[0]
'AAAAAA'
"""
seed = sc.parallelize([x for x in extend_kmer(seed_size, "", alphabet)])
return seed.flatMap(lambda kmer: extend_kmer(k - seed_size, kmer, alphabet))
k = ... # Integer
seed_size = ... # Integer <= k
kmers = generate_kmers(k, seed_size) # RDD kmers
在搜索方面,您可以做的最简单的优化是删除 cartesian
并使用本地生成:
from difflib import SequenceMatcher
def is_similar(x, y):
"""Dummy similarity check
>>> is_similar("AAAAA", "AAAAT")
True
>>> is_similar("AAAAA", "TTTTTT")
False
"""
return SequenceMatcher(None, x, y).ratio() > 0.75
def find_similar(kmer, f=is_similar, alphabet="ATGC"):
"""
>>> kmer, similar = find_similar("AAAAAA")
>>> sorted(similar)[:5]
['AAAAAA', 'AAAAAC', 'AAAAAG', 'AAAAAT', 'AAAACA']
"""
candidates = product(alphabet, repeat=len(kmer))
return (kmer, {"".join(x) for x in candidates if is_similar(kmer, x)})
similar_map = kmers.flatmap(find_similar)
这仍然是一种非常幼稚的方法,但它不需要昂贵的数据改组。
接下来您可以尝试改进搜索策略。它可以像上面那样在本地完成,也可以使用连接在全球范围内完成。
在这两种情况下,您都需要一种比检查所有可能的 kmers 更聪明的方法。首先想到的是使用从给定单词中提取的种子 kmers。在本地模式下,这些可以用作候选生成的起点,在全局模式下,连接键(可以选择与哈希结合)。
我正在处理 DNA 序列比对,但遇到性能问题。
我需要创建一个字典,将一个单词(一个设定长度的序列)映射到一个由单独的函数决定的所有相似单词的列表。
现在,我正在做以下事情:
all_words_rdd = sc.parallelize([''.join(word) for word in itertools.product(all_letters, repeat=WORD_SIZE)], PARALLELISM)
all_similar_word_pairs_map = (all_words_rdd.cartesian(all_words_rdd)
.filter(lambda (word1, word2), scoring_matrix=scoring_matrix, threshold_value=threshold_value: areWordsSimilar((word1, word2), scoring_matrix, threshold_value))
.groupByKey()
.mapValues(set)
.collectAsMap())
其中areWordsSimilar
显然是计算单词是否达到设定的相似度阈值。
但是,这太慢了。它适用于长度为 3 的单词,但一旦我再高一点,它就会呈指数级减慢(如您所料)。它还开始抱怨任务规模太大(同样,不足为奇)
我知道笛卡尔连接是执行此操作的一种非常低效的方法,但我不确定如何使用其他方法。
我想从这样的事情开始:
all_words_rdd = (sc.parallelize(xrange(0, len(all_letters) ** WORD_SIZE))
.repartition(PARALLELISM)
...
)
这样我就可以将计算拆分到多个节点。但是,我该如何计算呢?我正在考虑用基数做一些事情并使用模运算符推断字母(即在 len(all_letters)、num % 2 = all_letters[0]
、num % 3 = all_letters[1]
等的基数中)。
然而,这听起来非常复杂,所以我想知道是否有人有更好的方法。
提前致谢。
编辑 我知道我无法降低问题的指数复杂性,这不是我的目标。我的目标是通过让每个节点执行部分计算来分解跨多个执行节点的复杂性。但是,要做到这一点,我需要能够使用一些过程从数字中导出 DNA 词。
一般来说,即使没有 driver 辅助代码,它看起来也是一项无望的任务。序列集的大小呈指数级增长,你根本赢不了。根据您计划如何使用这些数据,很可能有更好的方法。
如果你仍然想这样做,你可以从 driver 和工人之间的 kmers 生成开始:
from itertools import product
def extend_kmer(n, kmer="", alphabet="ATGC"):
"""
>>> list(extend_kmer(2))[:4]
['AA', 'AT', 'AG', 'AC']
"""
tails = product(alphabet, repeat=n)
for tail in tails:
yield kmer + "".join(tail)
def generate_kmers(k, seed_size, alphabet="ATGC"):
"""
>>> kmers = generate_kmers(6, 3, "ATGC").collect()
>>> len(kmers)
4096
>>> sorted(kmers)[0]
'AAAAAA'
"""
seed = sc.parallelize([x for x in extend_kmer(seed_size, "", alphabet)])
return seed.flatMap(lambda kmer: extend_kmer(k - seed_size, kmer, alphabet))
k = ... # Integer
seed_size = ... # Integer <= k
kmers = generate_kmers(k, seed_size) # RDD kmers
在搜索方面,您可以做的最简单的优化是删除 cartesian
并使用本地生成:
from difflib import SequenceMatcher
def is_similar(x, y):
"""Dummy similarity check
>>> is_similar("AAAAA", "AAAAT")
True
>>> is_similar("AAAAA", "TTTTTT")
False
"""
return SequenceMatcher(None, x, y).ratio() > 0.75
def find_similar(kmer, f=is_similar, alphabet="ATGC"):
"""
>>> kmer, similar = find_similar("AAAAAA")
>>> sorted(similar)[:5]
['AAAAAA', 'AAAAAC', 'AAAAAG', 'AAAAAT', 'AAAACA']
"""
candidates = product(alphabet, repeat=len(kmer))
return (kmer, {"".join(x) for x in candidates if is_similar(kmer, x)})
similar_map = kmers.flatmap(find_similar)
这仍然是一种非常幼稚的方法,但它不需要昂贵的数据改组。
接下来您可以尝试改进搜索策略。它可以像上面那样在本地完成,也可以使用连接在全球范围内完成。
在这两种情况下,您都需要一种比检查所有可能的 kmers 更聪明的方法。首先想到的是使用从给定单词中提取的种子 kmers。在本地模式下,这些可以用作候选生成的起点,在全局模式下,连接键(可以选择与哈希结合)。