我并行化了吗?
Am I parallelizing this right?
我基本上是要从5000个矩阵的样本中抽样得到一定的概率分布。所以我只需要统计元素X在这5000个矩阵的位置(i,j)出现了多少次。然后,我将这些 [值和计数] 保存在字典中。
也就是说,我认为并行化我的代码可能是个好主意,因为串行代码会 运行 非常慢。
代码如下:
import multiprocessing as mp
import numpy as np
def func(N):
d = {}
filenames = ["file" + str(k) + ".txt" for k in range(N, N + 1000)]
##each of these files is a 306x306 matrix
for i in range(306):
data = np.vstack([np.loadtxt(f, delimiter=",", usecols=i) for f in filenames])
for j in range(306):
values, counts = np.unique(data.T[j], return_counts=True)
for i in values:
d[i] = counts[np.where(values == i)]
return d
if __name__ == "__main__":
N = mp.cpu_count()
with mp.Pool(processes=N) as p:
results = p.map(func, [m for m in range(1000, 5000, 1000)])
由于这是我第一次并行化一个函数,我希望得到一些反馈。此外,由于它必须加载 1000x306 矩阵,因此仍然很慢,因此非常欢迎任何有关如何改进它的建议。
基于此描述:
how many times element X occurs in the position (i,j) of these 5000 matrices
我会将您的代码重新构建为 return 一个 306x306 字典数组,其中包含出现在该位置的每个值的键,以及表示该值出现次数的值。然后,您可以并行生成文件子集的数据,然后在最后合并数据。您应该调整 chunksize
以一次加载多个文件(与 ram 一样多)以减少您必须手动遍历数组索引的次数。在访问 arr[:,i,j]
.
时,将数据重新排序为“Fortran”顺序应该会使数组访问更高效(调用 np.unique
会更快)
arr_shape = (5, 5)
chunksize = 10 #load chunks of 200 files at a time
start_n = 0
end_n = 100
def func(N):
#unpack args from input tuple
filenames = ["file" + str(k) + ".txt" for k in range(N[0], N[1])]
#load and stack all the arrays into single array
arr = np.stack([np.loadtxt(f, delimiter=",") for f in filenames])
#re-order data in memory for efficient access along 0th axis (not needed, but likely faster)
arr = np.asfortranarray(arr)
res = []
#the more arrays you can load at once (chunksize), the fewer times we have to go through this inefficient loop
for i in range(arr_shape[0]):
res.append(list())
for j in range(arr_shape[1]):
#each res[i][j] will be a tuple of (values, counts)
res[i].append(np.unique(arr[:,i,j], return_counts=True))
return res
if __name__ == "__main__":
with mp.Pool() as p:
#build tuples of (start, end) for chunks of arbitrary size
chunks = []
for start in range(start_n, end_n, chunksize):
if start + chunksize > end_n:
end = end_n
else:
end = start + chunksize
chunks.append((start, end))
#build array of dicts to merge results into
d = []
for i in range(arr_shape[0]):
d.append(list())
for j in range(arr_shape[1]):
#each d[i][j] will be a dict of d[value] = count
d[i].append(defaultdict(int)) #empty values default to 0
#call our "func" in parallel, and get any results as they come in.
for res in p.imap_unordered(func=func, iterable=chunks):
#merge the results into d
for i in range(arr_shape[0]):
for j in range(arr_shape[1]):
#recall result is array of tuples of (values, counts). zip() is an easy way to get them in pairs
for value, count in zip(res[i][j][0], res[i][j][1]):
d[i][j][value] += count
我基本上是要从5000个矩阵的样本中抽样得到一定的概率分布。所以我只需要统计元素X在这5000个矩阵的位置(i,j)出现了多少次。然后,我将这些 [值和计数] 保存在字典中。
也就是说,我认为并行化我的代码可能是个好主意,因为串行代码会 运行 非常慢。 代码如下:
import multiprocessing as mp
import numpy as np
def func(N):
d = {}
filenames = ["file" + str(k) + ".txt" for k in range(N, N + 1000)]
##each of these files is a 306x306 matrix
for i in range(306):
data = np.vstack([np.loadtxt(f, delimiter=",", usecols=i) for f in filenames])
for j in range(306):
values, counts = np.unique(data.T[j], return_counts=True)
for i in values:
d[i] = counts[np.where(values == i)]
return d
if __name__ == "__main__":
N = mp.cpu_count()
with mp.Pool(processes=N) as p:
results = p.map(func, [m for m in range(1000, 5000, 1000)])
由于这是我第一次并行化一个函数,我希望得到一些反馈。此外,由于它必须加载 1000x306 矩阵,因此仍然很慢,因此非常欢迎任何有关如何改进它的建议。
基于此描述:
how many times element X occurs in the position (i,j) of these 5000 matrices
我会将您的代码重新构建为 return 一个 306x306 字典数组,其中包含出现在该位置的每个值的键,以及表示该值出现次数的值。然后,您可以并行生成文件子集的数据,然后在最后合并数据。您应该调整 chunksize
以一次加载多个文件(与 ram 一样多)以减少您必须手动遍历数组索引的次数。在访问 arr[:,i,j]
.
np.unique
会更快)
arr_shape = (5, 5)
chunksize = 10 #load chunks of 200 files at a time
start_n = 0
end_n = 100
def func(N):
#unpack args from input tuple
filenames = ["file" + str(k) + ".txt" for k in range(N[0], N[1])]
#load and stack all the arrays into single array
arr = np.stack([np.loadtxt(f, delimiter=",") for f in filenames])
#re-order data in memory for efficient access along 0th axis (not needed, but likely faster)
arr = np.asfortranarray(arr)
res = []
#the more arrays you can load at once (chunksize), the fewer times we have to go through this inefficient loop
for i in range(arr_shape[0]):
res.append(list())
for j in range(arr_shape[1]):
#each res[i][j] will be a tuple of (values, counts)
res[i].append(np.unique(arr[:,i,j], return_counts=True))
return res
if __name__ == "__main__":
with mp.Pool() as p:
#build tuples of (start, end) for chunks of arbitrary size
chunks = []
for start in range(start_n, end_n, chunksize):
if start + chunksize > end_n:
end = end_n
else:
end = start + chunksize
chunks.append((start, end))
#build array of dicts to merge results into
d = []
for i in range(arr_shape[0]):
d.append(list())
for j in range(arr_shape[1]):
#each d[i][j] will be a dict of d[value] = count
d[i].append(defaultdict(int)) #empty values default to 0
#call our "func" in parallel, and get any results as they come in.
for res in p.imap_unordered(func=func, iterable=chunks):
#merge the results into d
for i in range(arr_shape[0]):
for j in range(arr_shape[1]):
#recall result is array of tuples of (values, counts). zip() is an easy way to get them in pairs
for value, count in zip(res[i][j][0], res[i][j][1]):
d[i][j][value] += count