在 Python3 中使用多处理进行文件读取
using multiprocessing for file reading in Python3
我有非常大的文件。每个文件将近 2GB。因此,我想 运行 并行处理多个文件。我可以这样做,因为所有文件都具有相似的格式,因此可以并行读取文件。我知道我应该使用多处理库,但我真的很困惑如何在我的代码中使用它。
我的文件读取代码是:
def file_reading(file,num_of_sample,segsites,positions,snp_matrix):
with open(file,buffering=2000009999) as f:
###I read file here. I am not putting that code here.
try:
assert len(snp_matrix) == len(positions)
return positions,snp_matrix ## return statement
except:
print('length of snp matrix and length of position vector not the same.')
sys.exit(1)
我的主要功能是:
if __name__ == "__main__":
segsites = []
positions = []
snp_matrix = []
path_to_directory = '/dataset/example/'
extension = '*.msOut'
num_of_samples = 162
filename = glob.glob(path_to_directory+extension)
###How can I use multiprocessing with function file_reading
number_of_workers = 10
x,y,z = [],[],[]
array_of_number_tuple = [(filename[file], segsites,positions,snp_matrix) for file in range(len(filename))]
with multiprocessing.Pool(number_of_workers) as p:
pos,snp = p.map(file_reading,array_of_number_tuple)
x.extend(pos)
y.extend(snp)
所以我对函数的输入如下:
- file - 包含文件名的列表
- num_of_samples - 整数值
- segsites - 最初是一个空列表,我想在阅读文件时将其附加到其中。
- positions - 最初是一个空列表,我想在读取文件时将其附加到其中。
- snp_matrix - 最初是一个空列表,我想在读取文件时将其附加到其中。
函数returns位置列表和snp_matrix列表末尾。我如何在我的参数是列表和整数的情况下使用多处理?我使用多处理的方式给我以下错误:
TypeError: file_reading() 缺少 3 个必需的位置参数:'segsites'、'positions' 和 'snp_matrix'
传递给 Pool.map 的列表中的元素不会自动解包。 'file_reading' 函数中通常只能有一个参数。
当然这个参数可以是元组,自己解包也没问题:
def file_reading(args):
file, num_of_sample, segsites, positions, snp_matrix = args
with open(file,buffering=2000009999) as f:
###I read file here. I am not putting that code here.
try:
assert len(snp_matrix) == len(positions)
return positions,snp_matrix ## return statement
except:
print('length of snp matrix and length of position vector not the same.')
sys.exit(1)
if __name__ == "__main__":
segsites = []
positions = []
snp_matrix = []
path_to_directory = '/dataset/example/'
extension = '*.msOut'
num_of_samples = 162
filename = glob.glob(path_to_directory+extension)
number_of_workers = 10
x,y,z = [],[],[]
array_of_number_tuple = [(filename[file], num_of_samples, segsites,positions,snp_matrix) for file in range(len(filename))]
with multiprocessing.Pool(number_of_workers) as p:
pos,snp = p.map(file_reading,array_of_number_tuple)
x.extend(pos)
y.extend(snp)
我有非常大的文件。每个文件将近 2GB。因此,我想 运行 并行处理多个文件。我可以这样做,因为所有文件都具有相似的格式,因此可以并行读取文件。我知道我应该使用多处理库,但我真的很困惑如何在我的代码中使用它。
我的文件读取代码是:
def file_reading(file,num_of_sample,segsites,positions,snp_matrix):
with open(file,buffering=2000009999) as f:
###I read file here. I am not putting that code here.
try:
assert len(snp_matrix) == len(positions)
return positions,snp_matrix ## return statement
except:
print('length of snp matrix and length of position vector not the same.')
sys.exit(1)
我的主要功能是:
if __name__ == "__main__":
segsites = []
positions = []
snp_matrix = []
path_to_directory = '/dataset/example/'
extension = '*.msOut'
num_of_samples = 162
filename = glob.glob(path_to_directory+extension)
###How can I use multiprocessing with function file_reading
number_of_workers = 10
x,y,z = [],[],[]
array_of_number_tuple = [(filename[file], segsites,positions,snp_matrix) for file in range(len(filename))]
with multiprocessing.Pool(number_of_workers) as p:
pos,snp = p.map(file_reading,array_of_number_tuple)
x.extend(pos)
y.extend(snp)
所以我对函数的输入如下:
- file - 包含文件名的列表
- num_of_samples - 整数值
- segsites - 最初是一个空列表,我想在阅读文件时将其附加到其中。
- positions - 最初是一个空列表,我想在读取文件时将其附加到其中。
- snp_matrix - 最初是一个空列表,我想在读取文件时将其附加到其中。
函数returns位置列表和snp_matrix列表末尾。我如何在我的参数是列表和整数的情况下使用多处理?我使用多处理的方式给我以下错误:
TypeError: file_reading() 缺少 3 个必需的位置参数:'segsites'、'positions' 和 'snp_matrix'
传递给 Pool.map 的列表中的元素不会自动解包。 'file_reading' 函数中通常只能有一个参数。
当然这个参数可以是元组,自己解包也没问题:
def file_reading(args):
file, num_of_sample, segsites, positions, snp_matrix = args
with open(file,buffering=2000009999) as f:
###I read file here. I am not putting that code here.
try:
assert len(snp_matrix) == len(positions)
return positions,snp_matrix ## return statement
except:
print('length of snp matrix and length of position vector not the same.')
sys.exit(1)
if __name__ == "__main__":
segsites = []
positions = []
snp_matrix = []
path_to_directory = '/dataset/example/'
extension = '*.msOut'
num_of_samples = 162
filename = glob.glob(path_to_directory+extension)
number_of_workers = 10
x,y,z = [],[],[]
array_of_number_tuple = [(filename[file], num_of_samples, segsites,positions,snp_matrix) for file in range(len(filename))]
with multiprocessing.Pool(number_of_workers) as p:
pos,snp = p.map(file_reading,array_of_number_tuple)
x.extend(pos)
y.extend(snp)