将多处理应用于不同版本的多处理
Applying multi processing to different versions f multi processing
我想 运行 所有 formatting
功能与 multiprocessing
功能同步。我如何才能添加此功能,使其 运行s 与下面的多处理功能同步 我尝试这样做,但它不起作用。本质上,我想同时 运行 formating(Numbers, limit1)
、formating(Numbers, limit2)
和 formating(Numbers, limit3)
。
代码:
import multiprocessing as mp
import numpy as np
def formating(a, b):
# Formating goes here
x = np.sort(b);
# digitize
l = np.digitize(a, x)
# output:
result = np.bincount(l, weights=a)
return result
Numbers = np.array([3, 4, 5, 7, 8, 10,20])
limit1 = np.array([0, 2 , 5, 12, 15])
limit2 = np.array([0, 2 , 5, 12])
limit3 = np.array([0, 2 , 5, 12, 15, 22])
formating(Numbers, limit1)
formating(Numbers, limit2)
formating(Numbers, limit3)
我可以告诉你怎么做,但你会对结果很不满意。有两个问题。第一个问题是,在创建进程池和将参数传递给辅助函数并取回 return 值时会产生一定的开销,因为辅助函数“存在于”不同的地址 space 和参数以及 return 值必须被“pickled”和“unpickled”才能传输。因此,在您的情况下,工作函数 formating
需要非常重要才能使产生的开销值得。其次,您的 worker 函数使用 numpy
,它本身取决于它正在做什么,有时会在内部对其某些方法调用使用多处理。在它之上使用您自己的多处理不会给您带来任何好处。事实上,worker 函数很短,numpy
方法是用 C 语言编写的,执行速度很快,这是一个相当简单的 worker 函数的例子。
以下是在一个循环中进行 3 formating
调用的基准,该循环迭代 100 次并计算经过的时间,然后使用大小为 3 的多处理池和方法 Pool.map
然后再次使用方法 Pool.apply_async
(对于这个例子,我希望最后两个多处理案例在 运行 时间内或多或少相同):
import multiprocessing as mp
import numpy as np
from functools import partial
import time
def formating(a, b):
# Formating goes here
x = np.sort(b);
# digitize
l = np.digitize(a, x)
# output:
result = np.bincount(l, weights=a)
return result
# needed for Windows:
if __name__ == '__main__':
Numbers = np.array([3, 4, 5, 7, 8, 10,20])
limit1 = np.array([0, 2 , 5, 12, 15])
limit2 = np.array([0, 2 , 5, 12])
limit3 = np.array([0, 2 , 5, 12, 15, 22])
TRIALS = 100
# non-multiprocessing:
t = time.time()
for _ in range(TRIALS):
result1 = formating(Numbers, limit1)
result2 = formating(Numbers, limit2)
result3 = formating(Numbers, limit3)
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
# multiprocessing version 1 (using method map):
# since first argument to formating is always the same:
worker = partial(formating, Numbers)
t = time.time()
for _ in range(TRIALS):
with mp.Pool(3) as pool:
result1, result2, result3 = pool.map(worker, [limit1, limit2, limit3])
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
# multiprocessing version 2 (using method apply_async)
t = time.time()
for _ in range(TRIALS):
with mp.Pool(3) as pool:
results = [pool.apply_async(formating, args=(Numbers, limit)) for limit in [limit1, limit2, limit3]]
result1, result2, result3 = [result.get() for result in results]
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
打印:
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 0.00299835205078125
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 27.002381324768066
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 27.023000240325928
结果
多处理版本只慢了大约 9000 倍(使用 map
和 apply_async
没有区别)。
如果我从基准测试中去掉创建池的开销,事情就会大大改善:
import multiprocessing as mp
import numpy as np
from functools import partial
import time
def formating(a, b):
# Formating goes here
x = np.sort(b);
# digitize
l = np.digitize(a, x)
# output:
result = np.bincount(l, weights=a)
return result
# needed for Windows:
if __name__ == '__main__':
Numbers = np.array([3, 4, 5, 7, 8, 10,20])
limit1 = np.array([0, 2 , 5, 12, 15])
limit2 = np.array([0, 2 , 5, 12])
limit3 = np.array([0, 2 , 5, 12, 15, 22])
TRIALS = 100
# multiprocessing version 1 (using method map):
# since first argument to formating is always the same:
worker = partial(formating, Numbers)
with mp.Pool(3) as pool:
t = time.time()
for _ in range(TRIALS):
result1, result2, result3 = pool.map(worker, [limit1, limit2, limit3])
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
打印:
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 0.32500314712524414
但与 0.003 秒相比,它仍然需要 0.325 秒。这只是向您展示了主要的开销在于池的创建——但是您仍然必须创建池并考虑该开销。
这是你如何做的,但在这种情况下不要这样做。
我想 运行 所有 formatting
功能与 multiprocessing
功能同步。我如何才能添加此功能,使其 运行s 与下面的多处理功能同步 我尝试这样做,但它不起作用。本质上,我想同时 运行 formating(Numbers, limit1)
、formating(Numbers, limit2)
和 formating(Numbers, limit3)
。
代码:
import multiprocessing as mp
import numpy as np
def formating(a, b):
# Formating goes here
x = np.sort(b);
# digitize
l = np.digitize(a, x)
# output:
result = np.bincount(l, weights=a)
return result
Numbers = np.array([3, 4, 5, 7, 8, 10,20])
limit1 = np.array([0, 2 , 5, 12, 15])
limit2 = np.array([0, 2 , 5, 12])
limit3 = np.array([0, 2 , 5, 12, 15, 22])
formating(Numbers, limit1)
formating(Numbers, limit2)
formating(Numbers, limit3)
我可以告诉你怎么做,但你会对结果很不满意。有两个问题。第一个问题是,在创建进程池和将参数传递给辅助函数并取回 return 值时会产生一定的开销,因为辅助函数“存在于”不同的地址 space 和参数以及 return 值必须被“pickled”和“unpickled”才能传输。因此,在您的情况下,工作函数 formating
需要非常重要才能使产生的开销值得。其次,您的 worker 函数使用 numpy
,它本身取决于它正在做什么,有时会在内部对其某些方法调用使用多处理。在它之上使用您自己的多处理不会给您带来任何好处。事实上,worker 函数很短,numpy
方法是用 C 语言编写的,执行速度很快,这是一个相当简单的 worker 函数的例子。
以下是在一个循环中进行 3 formating
调用的基准,该循环迭代 100 次并计算经过的时间,然后使用大小为 3 的多处理池和方法 Pool.map
然后再次使用方法 Pool.apply_async
(对于这个例子,我希望最后两个多处理案例在 运行 时间内或多或少相同):
import multiprocessing as mp
import numpy as np
from functools import partial
import time
def formating(a, b):
# Formating goes here
x = np.sort(b);
# digitize
l = np.digitize(a, x)
# output:
result = np.bincount(l, weights=a)
return result
# needed for Windows:
if __name__ == '__main__':
Numbers = np.array([3, 4, 5, 7, 8, 10,20])
limit1 = np.array([0, 2 , 5, 12, 15])
limit2 = np.array([0, 2 , 5, 12])
limit3 = np.array([0, 2 , 5, 12, 15, 22])
TRIALS = 100
# non-multiprocessing:
t = time.time()
for _ in range(TRIALS):
result1 = formating(Numbers, limit1)
result2 = formating(Numbers, limit2)
result3 = formating(Numbers, limit3)
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
# multiprocessing version 1 (using method map):
# since first argument to formating is always the same:
worker = partial(formating, Numbers)
t = time.time()
for _ in range(TRIALS):
with mp.Pool(3) as pool:
result1, result2, result3 = pool.map(worker, [limit1, limit2, limit3])
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
# multiprocessing version 2 (using method apply_async)
t = time.time()
for _ in range(TRIALS):
with mp.Pool(3) as pool:
results = [pool.apply_async(formating, args=(Numbers, limit)) for limit in [limit1, limit2, limit3]]
result1, result2, result3 = [result.get() for result in results]
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
打印:
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 0.00299835205078125
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 27.002381324768066
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 27.023000240325928
结果
多处理版本只慢了大约 9000 倍(使用 map
和 apply_async
没有区别)。
如果我从基准测试中去掉创建池的开销,事情就会大大改善:
import multiprocessing as mp
import numpy as np
from functools import partial
import time
def formating(a, b):
# Formating goes here
x = np.sort(b);
# digitize
l = np.digitize(a, x)
# output:
result = np.bincount(l, weights=a)
return result
# needed for Windows:
if __name__ == '__main__':
Numbers = np.array([3, 4, 5, 7, 8, 10,20])
limit1 = np.array([0, 2 , 5, 12, 15])
limit2 = np.array([0, 2 , 5, 12])
limit3 = np.array([0, 2 , 5, 12, 15, 22])
TRIALS = 100
# multiprocessing version 1 (using method map):
# since first argument to formating is always the same:
worker = partial(formating, Numbers)
with mp.Pool(3) as pool:
t = time.time()
for _ in range(TRIALS):
result1, result2, result3 = pool.map(worker, [limit1, limit2, limit3])
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
打印:
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 0.32500314712524414
但与 0.003 秒相比,它仍然需要 0.325 秒。这只是向您展示了主要的开销在于池的创建——但是您仍然必须创建池并考虑该开销。
这是你如何做的,但在这种情况下不要这样做。