高效地处理 python 个进程

Efficiently handling python processes

所以对于我的项目,我必须获得一个基因列表并从同义词中清除它ex. gene A might also be known as AA, so if in my original list there is AA and A I have to delete one of the two.

基因列表由用户提供,我从文本文件中读取了同义词。

两者都存储在dictionaries中。

列表是 huuuuuge(特朗普笑话),我将不得不在我的管道中多次调用此函数。所以我的问题是:我可以 multiprocess 这让它更快吗?

我最初的做法如下:

for g in genes:
    process = multiprocessing.Process(target = fixReoccurences, args =  (g, genes, synonyms, replaced, ))
    my_processes.append(process)
    process.start()

# Wait for *ALL* the processes to finish.
for p in my_processes:
    p.join()

但这种方法很快就失败了,因为我的脚本需要 400 个进程,所有进程都是 运行 约 40.000 次迭代的循环。从字面上看,它冻结了我的笔记本电脑。

那么,我该如何有效地利用 CPU 的多核处理进程来解决这个问题呢?

我生成了一些随机数据,然后进行了直线替换:

#!python3

import random
import string

from mark_time import mark

synonyms = { 'AA': 'A', 'BB': 'B'}

population = list(string.ascii_uppercase)
population[-1] = 'AA'   # replace 'Z' with AA
population[-2] = 'BB'   # replace Y with BB

mark('Building inputs')
inputs = [random.choice(population) for _ in range(40000 * 400)]
mark('... done')

print(' '.join(inputs[:100]))

mark('Building outputs')
outputs = [synonyms.get(ch, ch) for ch in inputs]
mark('... done')

print(' '.join(outputs[:100]))

我的输出如下所示:

[1490996255.208] Building inputs
[1490996273.388] ... done
N A U W R W H D E BB V A S B B U W U V S W V E K N Q E R H R A H I V U X V E U G A R D M R S K F O R B B G R C U M C C Q T K G S S H W AA U BB K L W T L H V BB K H J D AA K P G W BB W C U G T P G M J L S J
[1490996273.388] Building outputs
[1490996276.12] ... done
N A U W R W H D E B V A S B B U W U V S W V E K N Q E R H R A H I V U X V E U G A R D M R S K F O R B B G R C U M C C Q T K G S S H W A U B K L W T L H V B K H J D A K P G W B W C U G T P G M J L S J

构建输入数据需要 18 秒,替换同义词仅需 3 秒。那是 400 * 40,000 个项目。我不确定您的输入项是单个基因还是某种 SAM 序列或什么。问题中的更多信息可能会很好。 ;-)

我认为您不需要对此进行多处理。在读取文件的同时做好数据处理工作。

更新

抱歉昨晚辍学了。但是,啤酒。

无论如何,这里有一些代码将在同义词文件中读取,每行都有一对单词,例如 "old new", 并构建一个映射每个旧 -> 新单词的字典。然后 "flattens" 字典,这样就不需要重复查找——每个键都存储了它的最终值。我想你可以用它来读取同义词文件等。

def get_synonyms(synfile):
    """Read in a list of 'synonym' pairs, two words per line A -> B.
    Store the pairs in a dict. "Flatten" the dict, so that if A->B and
    B->C, the dict stores A->C and B->C directly. Return the dict.
    """

    syns = {}

    # Read entries from the file
    with open(synfile) if type(synfile) is str else synfile as sf:
        for line in sf:
            if not line.strip(): continue
            k,v = line.strip().split()
            syns[k] = v

    # "flatten" the synonyms. If A -> B and B -> C, then change A -> C
    for k,v in syns.items():
        nv = v
        while nv in syns:
            nv = syns[nv]
        syns[k] = nv

    return syns

import io

synonyms = """
A B
B C
C D
E B
F A
AA G
""".strip()

#with open('synonyms.txt') as synfile:
with io.StringIO(synonyms) as synfile:
    thesaurus = get_synonyms(synfile)

assert sorted(thesaurus.keys()) == "A AA B C E F".split()
assert thesaurus['A'] == 'D'
assert thesaurus['B'] == 'D'
assert thesaurus['C'] == 'D'
assert thesaurus['E'] == 'D'
assert thesaurus['F'] == 'D'
assert thesaurus['AA'] == 'G'

使用Pool.map.

你可以有一个函数,它接受一个基因和 returns 它或者 None 如果它应该被过滤:

def filter_gene_if_synonym(gene, synonyms):
    return None if gene in synonymns else gene

您可以使用 partial:

绑定函数的参数
from functools import partial

filter_gene = partial(filter_gene_if_synonym,
                      synonyms=synonyms)

然后只用一个基因就可以调用函数了。

您可以使用进程池将函数映射到数据序列:

pool = Pool(processes=4)
filtered_genes = [gene for gene in pool.map(filter_gene, genes)
                  if gene is not None]

map 函数还可以将数据块传递给适当的函数:

def filter_genes_of_synonyms(genes, synonyms):
    return [gene for gene in genes
            if gene not in synonymns]

filter_genes = partial(filter_genes, synonyms=synonyms)

和:

filtered_chunks = pool.map(filter_genes, genes, chunksize=50)
filtered_genes = [gene for chunk in filtered_chunks
                  for gene in chunk]