在多进程共享内存中访问 SeqIO.index 返回的字典
Access SeqIO.index returned dict in multiprocess shared memory
我有一个 FASTA 格式的大文件 (40G)。为了加快这个过程,我有一个使用 pool.map
的并行步骤。首先,SeqIO.index
用于将大文件的索引信息加载到共享内存(使用多处理管理器)。
但是当我运行下面的代码时,程序有时会崩溃(回溯只跟踪到pool.map
),尤其是当运行有很多进程时。如果 运行 喜欢 2 个进程,它可能会成功。但是如果我直接将所有数据(不是索引)加载到内存中(取消注释 "m_records2")程序总是 运行s 成功。我认为差异来自索引部分。任何建议将不胜感激!!!
import sys
import os
from Bio import SeqIO
from subprocess import *
from multiprocessing import Pool, Manager
manager = Manager()
m_records = manager.dict()
#m_records2 = manager.dict()
m_kmers=manager.dict()
def do_operation(seq):
##do some operations with m_kmers
return
def run_check(read_id):
seq=str(m_records[read_id].seq)
#seq=m_records2[read_id]
do_operation(seq)
def check_reads(n_threads):
read_id_list=list(m_records.keys())
#print read_id_list
pool = Pool(n_threads)
m_rslt=pool.map(run_check, read_id_list)
pool.close()
pool.join()
if __name__ == "__main__":
sf_reads=sys.argv[1]
n_threads=int(sys.argv[2])
m_records=SeqIO.index(sf_reads, "fasta")
# for key in m_records:
# m_records2[key]=str(m_records[key].seq)
check_reads(n_threads)
我可以用较小的数据集(所有来自大肠杆菌的蛋白质)重现您的问题,它确实随机发生。
看来问题是在 SeqIO.index
上使用了 manager.dict()
,这是一种不同的类型。
>>> print(type(m_records))
<class 'Bio.File._IndexedSeqFileDict'>
Indexes a sequence file and returns a dictionary like object.
来自source code:
Note that this pseudo dictionary will not support all the methods of
a true Python dictionary, for example values() is not defined since
this would require loading all of the records into memory at once.
如果您使用 SeqIO.to_dict
错误消失,但您可能 运行 内存不足。我不知道你的具体任务,但也许将 FASTA 文件分成更小的块并使用完整的字典可能会解决你的问题。
我有一个 FASTA 格式的大文件 (40G)。为了加快这个过程,我有一个使用 pool.map
的并行步骤。首先,SeqIO.index
用于将大文件的索引信息加载到共享内存(使用多处理管理器)。
但是当我运行下面的代码时,程序有时会崩溃(回溯只跟踪到pool.map
),尤其是当运行有很多进程时。如果 运行 喜欢 2 个进程,它可能会成功。但是如果我直接将所有数据(不是索引)加载到内存中(取消注释 "m_records2")程序总是 运行s 成功。我认为差异来自索引部分。任何建议将不胜感激!!!
import sys
import os
from Bio import SeqIO
from subprocess import *
from multiprocessing import Pool, Manager
manager = Manager()
m_records = manager.dict()
#m_records2 = manager.dict()
m_kmers=manager.dict()
def do_operation(seq):
##do some operations with m_kmers
return
def run_check(read_id):
seq=str(m_records[read_id].seq)
#seq=m_records2[read_id]
do_operation(seq)
def check_reads(n_threads):
read_id_list=list(m_records.keys())
#print read_id_list
pool = Pool(n_threads)
m_rslt=pool.map(run_check, read_id_list)
pool.close()
pool.join()
if __name__ == "__main__":
sf_reads=sys.argv[1]
n_threads=int(sys.argv[2])
m_records=SeqIO.index(sf_reads, "fasta")
# for key in m_records:
# m_records2[key]=str(m_records[key].seq)
check_reads(n_threads)
我可以用较小的数据集(所有来自大肠杆菌的蛋白质)重现您的问题,它确实随机发生。
看来问题是在 SeqIO.index
上使用了 manager.dict()
,这是一种不同的类型。
>>> print(type(m_records))
<class 'Bio.File._IndexedSeqFileDict'>
Indexes a sequence file and returns a dictionary like object.
来自source code:
Note that this pseudo dictionary will not support all the methods of a true Python dictionary, for example values() is not defined since this would require loading all of the records into memory at once.
如果您使用 SeqIO.to_dict
错误消失,但您可能 运行 内存不足。我不知道你的具体任务,但也许将 FASTA 文件分成更小的块并使用完整的字典可能会解决你的问题。