python 多处理 fastq 函数
python multiprocessing fastq function
我是 Python3 中 mutliprocessing 模块的新用户。
我有 2 个 fastq 文件(正向和反向),我想处理 forward/reverse 几次读取。为此,从正向阅读中,我得到了相应的反向并在这对夫妇上应用了一个具有多个参数的函数。到目前为止,我已经在 1 个线程上按顺序完成了它,这对于大文件来说相当长。现在,我想通过并行化函数应用程序来提高速度,因此我创建了前向文件块并使用多处理将函数应用于每个块。这是代码:
def chunk_itr(iterator, chunk_size):
"""
Function to split fastq file into smallest files for faster processing
From biopython solutions
"""
entry = True
while entry:
chunk = []
while len(chunk) < chunk_size:
try:
entry = next(iterator)
except StopIteration:
entry = None
if entry is None:
break
chunk.append(entry)
if chunk:
yield chunk
def chunk_fastq(f_fastq, chunkSize, path2out):
rec_itr = SeqIO.parse(open(f_fastq), "fastq")
os.mkdir(os.path.join(path2out, "chunk_files"))
dir_out = os.path.join(path2out, "chunk_files")
base = os.path.basename(f_fastq)
fname = os.path.splitext(base)[0]
for i, chunk in enumerate(chunk_itr(rec_itr, chunkSize)):
out_chunk_name = os.path.join(dir_out, "{0}_chunk{1}.fastq".format(fname, i))
with open(out_chunk_name, "w") as handle:
SeqIO.write(chunk, handle, "fastq")
def testmulti(fwd_chunk, rev_idx):
fwd_idx = SeqIO.index(fwd_chunk, "fastq")
for i in fwd_idx:
print(i, rev_idx[i])
pathfwd = "path/to/forward_file"
f_rev = "path/to/rev_fastq"
def main():
rev_idx = SeqIO.index(f_rev, "fastq")
chunk_fastq(pathfwd, 1000, path2chunk)
files = [f for f in os.listdir(path2chunk)]
# sequential
for i in files:
testmulti(i, rev_idx)
# parallel process
processes = []
for i in files:
proc = mp.Process(target=testmulti, args=(i, rev_idx,))
processes.append(proc)
proc.start()
for p in processes:
p.join()
顺序方法工作正常,但并行方法崩溃并出现以下错误:
Process Process-2:
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "test.py", line 28, in testmulti
print(i, rev_idx[i])
File "test.py", line 28, in testmulti
print(i, rev_idx[i])
File "/home/user/.local/lib/python3.6/site-packages/Bio/File.py", line 417, in __getitem__
record = self._proxy.get(self._offsets[key])
File "/home/user/.local/lib/python3.6/site-packages/Bio/File.py", line 417, in __getitem__
record = self._proxy.get(self._offsets[key])
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 69, in get
return self._parse(StringIO(_bytes_to_string(self.get_raw(offset))))
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 69, in get
return self._parse(StringIO(_bytes_to_string(self.get_raw(offset))))
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 664, in get_raw
raise ValueError("Problem with quality section")
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 642, in get_raw
raise ValueError("Premature end of file in seq section")
ValueError: Problem with quality section
ValueError: Premature end of file in seq section
根据 biopython 中的索引 class 描述,文件 format/structure 似乎存在问题
我仔细检查了输入文件,没有错误(并且它适用于顺序方法)。
到目前为止我的猜测:
- 像这样使用 Process 不是一个好的选择(我也试过 pool.starmap,但没有成功)
- 由于 f_rev 被索引一次,然后每个进程都尝试并行使用它,因此存在冲突
如有任何帮助,我们将不胜感激
谢谢!
好的,所以我仍然不能 100% 确定错误的原因,但是在增加我的 fastq 文件的大小后,我能够复制它。
它肯定与使用 SeqIO.index
创建的反向索引对象有关,但是我正在努力从源代码中完全理解它到底是什么样子,因为有很多继承正在进行。我 怀疑 这与将一个打开的文件句柄对象传递给子进程有关,但我再次对这方面的事情不够精通以保证它。
不过我可以成功防止错误。解决方案涉及将您创建的反向索引移动到子进程。我看不出有什么理由不这样做,SeqIO.Index
方法的全部意义在于它创建一个低内存索引而不是将整个文件读入内存,因此应该为每个子进程创建一个不要太贵。
def testmulti(fwd_chunk, rev):
rev_idx = SeqIO.index(rev, "fastq")
fwd_idx = SeqIO.index(fwd_chunk, "fastq")
for i in fwd_idx:
print(i, rev_idx[i])
pathfwd = "path/to/forward_file"
f_rev = "path/to/rev_fastq"
def main():
chunk_fastq(pathfwd, 1000, path2chunk)
files = [f for f in os.listdir(path2chunk)]
# sequential
for i in files:
testmulti(i, f_rev)
# parallel process
processes = []
for i in files:
proc = mp.Process(target=testmulti, args=(i, f_rev,))
processes.append(proc)
proc.start()
for p in processes:
p.join()
我是 Python3 中 mutliprocessing 模块的新用户。 我有 2 个 fastq 文件(正向和反向),我想处理 forward/reverse 几次读取。为此,从正向阅读中,我得到了相应的反向并在这对夫妇上应用了一个具有多个参数的函数。到目前为止,我已经在 1 个线程上按顺序完成了它,这对于大文件来说相当长。现在,我想通过并行化函数应用程序来提高速度,因此我创建了前向文件块并使用多处理将函数应用于每个块。这是代码:
def chunk_itr(iterator, chunk_size):
"""
Function to split fastq file into smallest files for faster processing
From biopython solutions
"""
entry = True
while entry:
chunk = []
while len(chunk) < chunk_size:
try:
entry = next(iterator)
except StopIteration:
entry = None
if entry is None:
break
chunk.append(entry)
if chunk:
yield chunk
def chunk_fastq(f_fastq, chunkSize, path2out):
rec_itr = SeqIO.parse(open(f_fastq), "fastq")
os.mkdir(os.path.join(path2out, "chunk_files"))
dir_out = os.path.join(path2out, "chunk_files")
base = os.path.basename(f_fastq)
fname = os.path.splitext(base)[0]
for i, chunk in enumerate(chunk_itr(rec_itr, chunkSize)):
out_chunk_name = os.path.join(dir_out, "{0}_chunk{1}.fastq".format(fname, i))
with open(out_chunk_name, "w") as handle:
SeqIO.write(chunk, handle, "fastq")
def testmulti(fwd_chunk, rev_idx):
fwd_idx = SeqIO.index(fwd_chunk, "fastq")
for i in fwd_idx:
print(i, rev_idx[i])
pathfwd = "path/to/forward_file"
f_rev = "path/to/rev_fastq"
def main():
rev_idx = SeqIO.index(f_rev, "fastq")
chunk_fastq(pathfwd, 1000, path2chunk)
files = [f for f in os.listdir(path2chunk)]
# sequential
for i in files:
testmulti(i, rev_idx)
# parallel process
processes = []
for i in files:
proc = mp.Process(target=testmulti, args=(i, rev_idx,))
processes.append(proc)
proc.start()
for p in processes:
p.join()
顺序方法工作正常,但并行方法崩溃并出现以下错误:
Process Process-2:
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "test.py", line 28, in testmulti
print(i, rev_idx[i])
File "test.py", line 28, in testmulti
print(i, rev_idx[i])
File "/home/user/.local/lib/python3.6/site-packages/Bio/File.py", line 417, in __getitem__
record = self._proxy.get(self._offsets[key])
File "/home/user/.local/lib/python3.6/site-packages/Bio/File.py", line 417, in __getitem__
record = self._proxy.get(self._offsets[key])
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 69, in get
return self._parse(StringIO(_bytes_to_string(self.get_raw(offset))))
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 69, in get
return self._parse(StringIO(_bytes_to_string(self.get_raw(offset))))
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 664, in get_raw
raise ValueError("Problem with quality section")
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 642, in get_raw
raise ValueError("Premature end of file in seq section")
ValueError: Problem with quality section
ValueError: Premature end of file in seq section
根据 biopython 中的索引 class 描述,文件 format/structure 似乎存在问题 我仔细检查了输入文件,没有错误(并且它适用于顺序方法)。 到目前为止我的猜测:
- 像这样使用 Process 不是一个好的选择(我也试过 pool.starmap,但没有成功)
- 由于 f_rev 被索引一次,然后每个进程都尝试并行使用它,因此存在冲突
如有任何帮助,我们将不胜感激
谢谢!
好的,所以我仍然不能 100% 确定错误的原因,但是在增加我的 fastq 文件的大小后,我能够复制它。
它肯定与使用 SeqIO.index
创建的反向索引对象有关,但是我正在努力从源代码中完全理解它到底是什么样子,因为有很多继承正在进行。我 怀疑 这与将一个打开的文件句柄对象传递给子进程有关,但我再次对这方面的事情不够精通以保证它。
不过我可以成功防止错误。解决方案涉及将您创建的反向索引移动到子进程。我看不出有什么理由不这样做,SeqIO.Index
方法的全部意义在于它创建一个低内存索引而不是将整个文件读入内存,因此应该为每个子进程创建一个不要太贵。
def testmulti(fwd_chunk, rev):
rev_idx = SeqIO.index(rev, "fastq")
fwd_idx = SeqIO.index(fwd_chunk, "fastq")
for i in fwd_idx:
print(i, rev_idx[i])
pathfwd = "path/to/forward_file"
f_rev = "path/to/rev_fastq"
def main():
chunk_fastq(pathfwd, 1000, path2chunk)
files = [f for f in os.listdir(path2chunk)]
# sequential
for i in files:
testmulti(i, f_rev)
# parallel process
processes = []
for i in files:
proc = mp.Process(target=testmulti, args=(i, f_rev,))
processes.append(proc)
proc.start()
for p in processes:
p.join()