如何将自定义函数应用于并行配对数组的元素?
How to apply a custom function to pair wise elements of an array in parallel?
我有一个数组,我想将每个元素与其他所有元素进行比较并建立交叉比较 table。它可以通过嵌套的 for 循环轻松实现,但是它的计算时间随着输入数组的大小呈指数增长,所以我想实现一种并行处理方法来减少更大尺寸的时间消耗。
我有一个数组,例如 a = [1,2,3]
,我想应用一个自定义函数,例如:
def add_two_numbers(x,y):
return x+y
一个简单的嵌套 for 循环实现如下:
array = [1,2,3]
matrix = np.zeros([3,3])
for i, one_element in enumerate(array):
for j, other_element in enumerate(array):
matrix[i][j] = add_two_numbers(one_element, other_element)
输出为:
>>> matrix
1 2 3
______________
1 | 2 3 4
2 | 3 4 5
3 | 4 5 6
在 python 中对大型数组应用并行处理的好方法是什么?
我使用 python 多处理库中的进程 class 为 n 元素数组创建 n 个进程,但是每个进程在后端打开一个文件,在 1024 个并行进程之后我得到一个 "Too many open files"例外。而且我必须让矩阵成为一个全局变量,这样每个进程都会更新一个特定的元素。
import multiprocessing as mp
def add_two_numbers_process(one_element, array, i):
global matrix
for j, other_element in enumerate(array):
matrix[i][j] = add_two_numbers(one_element, other_element)
return
processes = []
for i, one_element in enumerate(array):
p = mp.Process(target=add_two_numbers_process, args=(one_element, array, i))
processes.append(p)
p.start()
for process in processes:
process.join()
我也使用了 Pool class,但这比过程 class 花费的时间多 1000 倍,这似乎不可行。
import multiprocessing as mp
def add_two_numbers_pool(one_element, array, i):
row = [0 for x in range(len(array))]
for j, other_element in enumerate(array):
row[j] = add_two_numbers(one_element, other_element)
return row
pool = mp.Pool(mp.cpu_count())
matrix = [pool.apply(add_two_numbers_pool, args=(one_element, array, i)) for i, one_element in enumerate(array)]
pool.close()
我想不出使用 dask 分布式的方法。 dask distributed 在这种情况下是否有用?
作为使用多处理的演示以及矢量化与非矢量化的区别,我们可以从共享代码中的 defining/pulling 开始:
from multiprocessing import Pool
import numpy as np
def add_two_numbers(x,y):
return x+y
# use a large number of values so processing takes some measurable amount of time
values = np.arange(3001)
然后我们可以做你天真的事情:
result = np.empty([len(values)]*2, values.dtype)
for i, x in enumerate(values):
for j, y in enumerate(values):
result[i,j] = add_two_numbers(x, y)
在我的笔记本电脑上大约需要 3.5 秒。然后我们可以将其移动到使用 multiprocessing
Pool
with:
def process_row(x):
output = np.empty_like(values)
for i, y in enumerate(values):
output[i] = add_two_numbers(x, y)
return output
with Pool() as pool:
result = np.array(pool.map(process_row, values))
这需要我大约 1 秒的时间,然后我们可以在 Pool
中将其向量化:
def process_row_vec(x):
return add_two_numbers(values, x)
with Pool() as pool:
result = np.array(pool.map(process_row_vec, values))
这需要 0.25 秒,最后我们可以使用完全矢量化的 numpy 版本:
x, y = np.meshgrid(values, values)
result = add_two_numbers(x, y)
这需要大约 0.09 秒(90 毫秒)。我还意识到,当处理如此大量的元素时,这些中间数组(x
和 y
)会占用大量计算时间,并且对行进行矢量化会更快:
result = np.empty([len(values)]*2, values.dtype)
for i, x in enumerate(values):
result[i,:] = add_two_numbers(x, values)
耗时 0.05 秒(50 毫秒)。
希望这些例子能给你一些关于如何实现你的算法的想法!
我有一个数组,我想将每个元素与其他所有元素进行比较并建立交叉比较 table。它可以通过嵌套的 for 循环轻松实现,但是它的计算时间随着输入数组的大小呈指数增长,所以我想实现一种并行处理方法来减少更大尺寸的时间消耗。
我有一个数组,例如 a = [1,2,3]
,我想应用一个自定义函数,例如:
def add_two_numbers(x,y):
return x+y
一个简单的嵌套 for 循环实现如下:
array = [1,2,3]
matrix = np.zeros([3,3])
for i, one_element in enumerate(array):
for j, other_element in enumerate(array):
matrix[i][j] = add_two_numbers(one_element, other_element)
输出为:
>>> matrix
1 2 3
______________
1 | 2 3 4
2 | 3 4 5
3 | 4 5 6
在 python 中对大型数组应用并行处理的好方法是什么?
我使用 python 多处理库中的进程 class 为 n 元素数组创建 n 个进程,但是每个进程在后端打开一个文件,在 1024 个并行进程之后我得到一个 "Too many open files"例外。而且我必须让矩阵成为一个全局变量,这样每个进程都会更新一个特定的元素。
import multiprocessing as mp
def add_two_numbers_process(one_element, array, i):
global matrix
for j, other_element in enumerate(array):
matrix[i][j] = add_two_numbers(one_element, other_element)
return
processes = []
for i, one_element in enumerate(array):
p = mp.Process(target=add_two_numbers_process, args=(one_element, array, i))
processes.append(p)
p.start()
for process in processes:
process.join()
我也使用了 Pool class,但这比过程 class 花费的时间多 1000 倍,这似乎不可行。
import multiprocessing as mp
def add_two_numbers_pool(one_element, array, i):
row = [0 for x in range(len(array))]
for j, other_element in enumerate(array):
row[j] = add_two_numbers(one_element, other_element)
return row
pool = mp.Pool(mp.cpu_count())
matrix = [pool.apply(add_two_numbers_pool, args=(one_element, array, i)) for i, one_element in enumerate(array)]
pool.close()
我想不出使用 dask 分布式的方法。 dask distributed 在这种情况下是否有用?
作为使用多处理的演示以及矢量化与非矢量化的区别,我们可以从共享代码中的 defining/pulling 开始:
from multiprocessing import Pool
import numpy as np
def add_two_numbers(x,y):
return x+y
# use a large number of values so processing takes some measurable amount of time
values = np.arange(3001)
然后我们可以做你天真的事情:
result = np.empty([len(values)]*2, values.dtype)
for i, x in enumerate(values):
for j, y in enumerate(values):
result[i,j] = add_two_numbers(x, y)
在我的笔记本电脑上大约需要 3.5 秒。然后我们可以将其移动到使用 multiprocessing
Pool
with:
def process_row(x):
output = np.empty_like(values)
for i, y in enumerate(values):
output[i] = add_two_numbers(x, y)
return output
with Pool() as pool:
result = np.array(pool.map(process_row, values))
这需要我大约 1 秒的时间,然后我们可以在 Pool
中将其向量化:
def process_row_vec(x):
return add_two_numbers(values, x)
with Pool() as pool:
result = np.array(pool.map(process_row_vec, values))
这需要 0.25 秒,最后我们可以使用完全矢量化的 numpy 版本:
x, y = np.meshgrid(values, values)
result = add_two_numbers(x, y)
这需要大约 0.09 秒(90 毫秒)。我还意识到,当处理如此大量的元素时,这些中间数组(x
和 y
)会占用大量计算时间,并且对行进行矢量化会更快:
result = np.empty([len(values)]*2, values.dtype)
for i, x in enumerate(values):
result[i,:] = add_two_numbers(x, values)
耗时 0.05 秒(50 毫秒)。
希望这些例子能给你一些关于如何实现你的算法的想法!