高效地处理 python 个进程
Efficiently handling python processes
所以对于我的项目,我必须获得一个基因列表并从同义词中清除它ex. gene A might also be known as AA, so if in my original list there is AA and A I have to delete one of the two.
基因列表由用户提供,我从文本文件中读取了同义词。
两者都存储在dictionaries
中。
列表是 huuuuuge
(特朗普笑话),我将不得不在我的管道中多次调用此函数。所以我的问题是:我可以 multiprocess
这让它更快吗?
我最初的做法如下:
for g in genes:
process = multiprocessing.Process(target = fixReoccurences, args = (g, genes, synonyms, replaced, ))
my_processes.append(process)
process.start()
# Wait for *ALL* the processes to finish.
for p in my_processes:
p.join()
但这种方法很快就失败了,因为我的脚本需要 400 个进程,所有进程都是 运行 约 40.000 次迭代的循环。从字面上看,它冻结了我的笔记本电脑。
那么,我该如何有效地利用 CPU 的多核处理进程来解决这个问题呢?
我生成了一些随机数据,然后进行了直线替换:
#!python3
import random
import string
from mark_time import mark
synonyms = { 'AA': 'A', 'BB': 'B'}
population = list(string.ascii_uppercase)
population[-1] = 'AA' # replace 'Z' with AA
population[-2] = 'BB' # replace Y with BB
mark('Building inputs')
inputs = [random.choice(population) for _ in range(40000 * 400)]
mark('... done')
print(' '.join(inputs[:100]))
mark('Building outputs')
outputs = [synonyms.get(ch, ch) for ch in inputs]
mark('... done')
print(' '.join(outputs[:100]))
我的输出如下所示:
[1490996255.208] Building inputs
[1490996273.388] ... done
N A U W R W H D E BB V A S B B U W U V S W V E K N Q E R H R A H I V U X V E U G A R D M R S K F O R B B G R C U M C C Q T K G S S H W AA U BB K L W T L H V BB K H J D AA K P G W BB W C U G T P G M J L S J
[1490996273.388] Building outputs
[1490996276.12] ... done
N A U W R W H D E B V A S B B U W U V S W V E K N Q E R H R A H I V U X V E U G A R D M R S K F O R B B G R C U M C C Q T K G S S H W A U B K L W T L H V B K H J D A K P G W B W C U G T P G M J L S J
构建输入数据需要 18 秒,替换同义词仅需 3 秒。那是 400 * 40,000 个项目。我不确定您的输入项是单个基因还是某种 SAM 序列或什么。问题中的更多信息可能会很好。 ;-)
我认为您不需要对此进行多处理。在读取文件的同时做好数据处理工作。
更新
抱歉昨晚辍学了。但是,啤酒。
无论如何,这里有一些代码将在同义词文件中读取,每行都有一对单词,例如 "old new",
并构建一个映射每个旧 -> 新单词的字典。然后 "flattens" 字典,这样就不需要重复查找——每个键都存储了它的最终值。我想你可以用它来读取同义词文件等。
def get_synonyms(synfile):
"""Read in a list of 'synonym' pairs, two words per line A -> B.
Store the pairs in a dict. "Flatten" the dict, so that if A->B and
B->C, the dict stores A->C and B->C directly. Return the dict.
"""
syns = {}
# Read entries from the file
with open(synfile) if type(synfile) is str else synfile as sf:
for line in sf:
if not line.strip(): continue
k,v = line.strip().split()
syns[k] = v
# "flatten" the synonyms. If A -> B and B -> C, then change A -> C
for k,v in syns.items():
nv = v
while nv in syns:
nv = syns[nv]
syns[k] = nv
return syns
import io
synonyms = """
A B
B C
C D
E B
F A
AA G
""".strip()
#with open('synonyms.txt') as synfile:
with io.StringIO(synonyms) as synfile:
thesaurus = get_synonyms(synfile)
assert sorted(thesaurus.keys()) == "A AA B C E F".split()
assert thesaurus['A'] == 'D'
assert thesaurus['B'] == 'D'
assert thesaurus['C'] == 'D'
assert thesaurus['E'] == 'D'
assert thesaurus['F'] == 'D'
assert thesaurus['AA'] == 'G'
使用Pool.map
.
你可以有一个函数,它接受一个基因和 returns 它或者 None
如果它应该被过滤:
def filter_gene_if_synonym(gene, synonyms):
return None if gene in synonymns else gene
您可以使用 partial
:
绑定函数的参数
from functools import partial
filter_gene = partial(filter_gene_if_synonym,
synonyms=synonyms)
然后只用一个基因就可以调用函数了。
您可以使用进程池将函数映射到数据序列:
pool = Pool(processes=4)
filtered_genes = [gene for gene in pool.map(filter_gene, genes)
if gene is not None]
map 函数还可以将数据块传递给适当的函数:
def filter_genes_of_synonyms(genes, synonyms):
return [gene for gene in genes
if gene not in synonymns]
filter_genes = partial(filter_genes, synonyms=synonyms)
和:
filtered_chunks = pool.map(filter_genes, genes, chunksize=50)
filtered_genes = [gene for chunk in filtered_chunks
for gene in chunk]
所以对于我的项目,我必须获得一个基因列表并从同义词中清除它ex. gene A might also be known as AA, so if in my original list there is AA and A I have to delete one of the two.
基因列表由用户提供,我从文本文件中读取了同义词。
两者都存储在dictionaries
中。
列表是 huuuuuge
(特朗普笑话),我将不得不在我的管道中多次调用此函数。所以我的问题是:我可以 multiprocess
这让它更快吗?
我最初的做法如下:
for g in genes:
process = multiprocessing.Process(target = fixReoccurences, args = (g, genes, synonyms, replaced, ))
my_processes.append(process)
process.start()
# Wait for *ALL* the processes to finish.
for p in my_processes:
p.join()
但这种方法很快就失败了,因为我的脚本需要 400 个进程,所有进程都是 运行 约 40.000 次迭代的循环。从字面上看,它冻结了我的笔记本电脑。
那么,我该如何有效地利用 CPU 的多核处理进程来解决这个问题呢?
我生成了一些随机数据,然后进行了直线替换:
#!python3
import random
import string
from mark_time import mark
synonyms = { 'AA': 'A', 'BB': 'B'}
population = list(string.ascii_uppercase)
population[-1] = 'AA' # replace 'Z' with AA
population[-2] = 'BB' # replace Y with BB
mark('Building inputs')
inputs = [random.choice(population) for _ in range(40000 * 400)]
mark('... done')
print(' '.join(inputs[:100]))
mark('Building outputs')
outputs = [synonyms.get(ch, ch) for ch in inputs]
mark('... done')
print(' '.join(outputs[:100]))
我的输出如下所示:
[1490996255.208] Building inputs
[1490996273.388] ... done
N A U W R W H D E BB V A S B B U W U V S W V E K N Q E R H R A H I V U X V E U G A R D M R S K F O R B B G R C U M C C Q T K G S S H W AA U BB K L W T L H V BB K H J D AA K P G W BB W C U G T P G M J L S J
[1490996273.388] Building outputs
[1490996276.12] ... done
N A U W R W H D E B V A S B B U W U V S W V E K N Q E R H R A H I V U X V E U G A R D M R S K F O R B B G R C U M C C Q T K G S S H W A U B K L W T L H V B K H J D A K P G W B W C U G T P G M J L S J
构建输入数据需要 18 秒,替换同义词仅需 3 秒。那是 400 * 40,000 个项目。我不确定您的输入项是单个基因还是某种 SAM 序列或什么。问题中的更多信息可能会很好。 ;-)
我认为您不需要对此进行多处理。在读取文件的同时做好数据处理工作。
更新
抱歉昨晚辍学了。但是,啤酒。
无论如何,这里有一些代码将在同义词文件中读取,每行都有一对单词,例如 "old new",
并构建一个映射每个旧 -> 新单词的字典。然后 "flattens" 字典,这样就不需要重复查找——每个键都存储了它的最终值。我想你可以用它来读取同义词文件等。
def get_synonyms(synfile):
"""Read in a list of 'synonym' pairs, two words per line A -> B.
Store the pairs in a dict. "Flatten" the dict, so that if A->B and
B->C, the dict stores A->C and B->C directly. Return the dict.
"""
syns = {}
# Read entries from the file
with open(synfile) if type(synfile) is str else synfile as sf:
for line in sf:
if not line.strip(): continue
k,v = line.strip().split()
syns[k] = v
# "flatten" the synonyms. If A -> B and B -> C, then change A -> C
for k,v in syns.items():
nv = v
while nv in syns:
nv = syns[nv]
syns[k] = nv
return syns
import io
synonyms = """
A B
B C
C D
E B
F A
AA G
""".strip()
#with open('synonyms.txt') as synfile:
with io.StringIO(synonyms) as synfile:
thesaurus = get_synonyms(synfile)
assert sorted(thesaurus.keys()) == "A AA B C E F".split()
assert thesaurus['A'] == 'D'
assert thesaurus['B'] == 'D'
assert thesaurus['C'] == 'D'
assert thesaurus['E'] == 'D'
assert thesaurus['F'] == 'D'
assert thesaurus['AA'] == 'G'
使用Pool.map
.
你可以有一个函数,它接受一个基因和 returns 它或者 None
如果它应该被过滤:
def filter_gene_if_synonym(gene, synonyms):
return None if gene in synonymns else gene
您可以使用 partial
:
from functools import partial
filter_gene = partial(filter_gene_if_synonym,
synonyms=synonyms)
然后只用一个基因就可以调用函数了。
您可以使用进程池将函数映射到数据序列:
pool = Pool(processes=4)
filtered_genes = [gene for gene in pool.map(filter_gene, genes)
if gene is not None]
map 函数还可以将数据块传递给适当的函数:
def filter_genes_of_synonyms(genes, synonyms):
return [gene for gene in genes
if gene not in synonymns]
filter_genes = partial(filter_genes, synonyms=synonyms)
和:
filtered_chunks = pool.map(filter_genes, genes, chunksize=50)
filtered_genes = [gene for chunk in filtered_chunks
for gene in chunk]