并行读取 Python 磁盘中的文件
Reading files from disk in Python in Parallel
我正在从 MATLAB 迁移到 Python,主要是因为 Python 中提供了大量有趣的机器学习包。但是让我感到困惑的问题之一是并行处理。特别是,我想在 for
循环中从磁盘读取数千个文本文件,并且我想并行执行。在 MATLAB 中,使用 parfor
而不是 for
就可以了,但到目前为止我还没有弄清楚如何在 python 中做到这一点。
这是我想做的一个例子。我想读取 N 个文本文件,将它们整形为一个 N1xN2 数组,然后将每个文件保存到一个 NxN1xN2 numpy 数组中。这个数组将是我从函数中 return 得到的。假设文件名为file0001.dat
、file0002.dat
等,我喜欢并行化的代码如下:
import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
for counter in range(N):
t_str="%.4d" % counter
filename = 'file_'+t_str+'.dat'
temp_array = np.loadtxt(filename)
temp_array.shape=[N1,N2]
result[counter,:,:]=temp_array
我 运行 集群上的代码,因此我可以使用多个处理器来完成这项工作。因此,欢迎就哪种并行化方法更适合我的任务(如果有多个)发表评论。
注意:我知道这个 post,但在那个 post 中,只有 out1
、out2
、out3
变量需要担心关于,并且它们已明确用作要并行化的函数的参数。但是在这里,我有许多 2D 数组应该从文件中读取并保存到 3D 数组中。所以,这个问题的答案对我的情况来说不够笼统(或者这就是我的理解)。
您可能仍想使用多处理,只是结构略有不同:
from multiprocessing import Pool
import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
filenames = ('file_%.4d.dat' % i for i in range(N))
myshaper = lambda fname: np.loadtxt(fname).reshape([N1, nN2])
pool = Pool()
for i, temparray in enumerate(pool.imap(myshaper, filenames)):
result[i, :, :] = temp_array
pool.close()
pool.join()
这样做是首先为 filenames
中的文件名获取一个生成器。这意味着文件名不存储在内存中,但您仍然可以遍历它们。接下来,它创建一个 lambda 函数(相当于 matlab 中的匿名函数)来加载和重塑文件(您也可以使用普通函数)。然后它将该函数应用于使用多个进程的每个文件名,并将结果放入整体数组中。然后它关闭进程。
这个版本使用了一些更地道的python。然而,一种与您原来的方法更相似的方法(虽然不那么惯用)可能会帮助您更好地理解:
from multiprocessing import Pool
import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
def proccounter(counter):
t_str="%.4d" % counter
filename = 'file_'+t_str+'.dat'
temp_array = np.loadtxt(filename)
temp_array.shape=[N1,N2]
return counter, temp_array
pool = Pool()
for counter, temp_array in pool.imap(proccounter, range(N)):
result[counter,:,:] = temp_array
pool.close()
pool.join()
这只是将大部分 for
循环拆分为一个函数,使用多个处理器将该函数应用于范围内的每个元素,然后将结果放入数组中。它基本上只是将 for
循环分成两个 for
循环的原始函数。
可以使用 joblib
库完成,如下所示:
def par_func(N1, N2, counter):
import numpy as np
t_str="%.4d" % counter
filename = 'file_'+t_str+'.dat'
temp_array = np.loadtxt(filename)
temp_array.shape=[N1,N2]
# temp_array = np.random.randn(N1, N2) # use this line to test
return temp_array
if __name__ == '__main__':
import numpy as np
N=1000
N1=200
N2=100
from joblib import Parallel, delayed
num_jobs = 2
output_list = Parallel(n_jobs=num_jobs)(delayed(par_func)
(N1, N2, counter)
for counter in range(N))
output_array = np.array(output_list)
我正在从 MATLAB 迁移到 Python,主要是因为 Python 中提供了大量有趣的机器学习包。但是让我感到困惑的问题之一是并行处理。特别是,我想在 for
循环中从磁盘读取数千个文本文件,并且我想并行执行。在 MATLAB 中,使用 parfor
而不是 for
就可以了,但到目前为止我还没有弄清楚如何在 python 中做到这一点。
这是我想做的一个例子。我想读取 N 个文本文件,将它们整形为一个 N1xN2 数组,然后将每个文件保存到一个 NxN1xN2 numpy 数组中。这个数组将是我从函数中 return 得到的。假设文件名为file0001.dat
、file0002.dat
等,我喜欢并行化的代码如下:
import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
for counter in range(N):
t_str="%.4d" % counter
filename = 'file_'+t_str+'.dat'
temp_array = np.loadtxt(filename)
temp_array.shape=[N1,N2]
result[counter,:,:]=temp_array
我 运行 集群上的代码,因此我可以使用多个处理器来完成这项工作。因此,欢迎就哪种并行化方法更适合我的任务(如果有多个)发表评论。
注意:我知道这个 post,但在那个 post 中,只有 out1
、out2
、out3
变量需要担心关于,并且它们已明确用作要并行化的函数的参数。但是在这里,我有许多 2D 数组应该从文件中读取并保存到 3D 数组中。所以,这个问题的答案对我的情况来说不够笼统(或者这就是我的理解)。
您可能仍想使用多处理,只是结构略有不同:
from multiprocessing import Pool
import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
filenames = ('file_%.4d.dat' % i for i in range(N))
myshaper = lambda fname: np.loadtxt(fname).reshape([N1, nN2])
pool = Pool()
for i, temparray in enumerate(pool.imap(myshaper, filenames)):
result[i, :, :] = temp_array
pool.close()
pool.join()
这样做是首先为 filenames
中的文件名获取一个生成器。这意味着文件名不存储在内存中,但您仍然可以遍历它们。接下来,它创建一个 lambda 函数(相当于 matlab 中的匿名函数)来加载和重塑文件(您也可以使用普通函数)。然后它将该函数应用于使用多个进程的每个文件名,并将结果放入整体数组中。然后它关闭进程。
这个版本使用了一些更地道的python。然而,一种与您原来的方法更相似的方法(虽然不那么惯用)可能会帮助您更好地理解:
from multiprocessing import Pool
import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
def proccounter(counter):
t_str="%.4d" % counter
filename = 'file_'+t_str+'.dat'
temp_array = np.loadtxt(filename)
temp_array.shape=[N1,N2]
return counter, temp_array
pool = Pool()
for counter, temp_array in pool.imap(proccounter, range(N)):
result[counter,:,:] = temp_array
pool.close()
pool.join()
这只是将大部分 for
循环拆分为一个函数,使用多个处理器将该函数应用于范围内的每个元素,然后将结果放入数组中。它基本上只是将 for
循环分成两个 for
循环的原始函数。
可以使用 joblib
库完成,如下所示:
def par_func(N1, N2, counter):
import numpy as np
t_str="%.4d" % counter
filename = 'file_'+t_str+'.dat'
temp_array = np.loadtxt(filename)
temp_array.shape=[N1,N2]
# temp_array = np.random.randn(N1, N2) # use this line to test
return temp_array
if __name__ == '__main__':
import numpy as np
N=1000
N1=200
N2=100
from joblib import Parallel, delayed
num_jobs = 2
output_list = Parallel(n_jobs=num_jobs)(delayed(par_func)
(N1, N2, counter)
for counter in range(N))
output_array = np.array(output_list)