在多进程共享内存中访问 SeqIO.index 返回的字典

Access SeqIO.index returned dict in multiprocess shared memory

我有一个 FASTA 格式的大文件 (40G)。为了加快这个过程,我有一个使用 pool.map 的并行步骤。首先,SeqIO.index 用于将大文件的索引信息加载到共享内存(使用多处理管理器)。

但是当我运行下面的代码时,程序有时会崩溃(回溯只跟踪到pool.map),尤其是当运行有很多进程时。如果 运行 喜欢 2 个进程,它可能会成功。但是如果我直接将所有数据(不是索引)加载到内存中(取消注释 "m_records2")程序总是 运行s 成功。我认为差异来自索引部分。任何建议将不胜感激!!!

import sys
import os
from Bio import SeqIO
from subprocess import *
from multiprocessing import Pool, Manager

manager = Manager()
m_records = manager.dict()
#m_records2 = manager.dict()
m_kmers=manager.dict()

def do_operation(seq):
    ##do some operations with m_kmers
    return

def run_check(read_id):
    seq=str(m_records[read_id].seq)
    #seq=m_records2[read_id]
    do_operation(seq)

def check_reads(n_threads):
    read_id_list=list(m_records.keys())
    #print read_id_list
    pool = Pool(n_threads)
    m_rslt=pool.map(run_check, read_id_list)
    pool.close()
    pool.join()

if __name__ == "__main__":
    sf_reads=sys.argv[1]
    n_threads=int(sys.argv[2])
    m_records=SeqIO.index(sf_reads, "fasta")
    # for key in m_records:
    #     m_records2[key]=str(m_records[key].seq)
    check_reads(n_threads)

我可以用较小的数据集(所有来自大肠杆菌的蛋白质)重现您的问题,它确实随机发生。 看来问题是在 SeqIO.index 上使用了 manager.dict(),这是一种不同的类型。

>>> print(type(m_records))
<class 'Bio.File._IndexedSeqFileDict'>

来自documentation:

Indexes a sequence file and returns a dictionary like object.

来自source code

Note that this pseudo dictionary will not support all the methods of a true Python dictionary, for example values() is not defined since this would require loading all of the records into memory at once.

如果您使用 SeqIO.to_dict 错误消失,但您可能 运行 内存不足。我不知道你的具体任务,但也许将 FASTA 文件分成更小的块并使用完整的字典可能会解决你的问题。