将多处理应用于不同版本的多处理

Applying multi processing to different versions f multi processing

我想 运行 所有 formatting 功能与 multiprocessing 功能同步。我如何才能添加此功能,使其 运行s 与下面的多处理功能同步 我尝试这样做,但它不起作用。本质上,我想同时 运行 formating(Numbers, limit1)formating(Numbers, limit2)formating(Numbers, limit3)

代码:

import multiprocessing as mp
import numpy as np
def formating(a, b):
    # Formating goes here
    x = np.sort(b);
    # digitize
    l = np.digitize(a, x)
    # output:
    result = np.bincount(l, weights=a)
    return result

Numbers = np.array([3, 4, 5, 7, 8, 10,20])
limit1 = np.array([0, 2 , 5, 12, 15])
limit2 = np.array([0, 2 , 5, 12])
limit3 = np.array([0, 2 , 5, 12, 15, 22])
formating(Numbers, limit1)
formating(Numbers, limit2)
formating(Numbers, limit3)

我可以告诉你怎么做,但你会对结果很不满意。有两个问题。第一个问题是,在创建进程池和将参数传递给辅助函数并取回 return 值时会产生一定的开销,因为辅助函数“存在于”不同的地址 space 和参数以及 return 值必须被“pickled”和“unpickled”才能传输。因此,在您的情况下,工作函数 formating 需要非常重要才能使产生的开销值得。其次,您的 worker 函数使用 numpy,它本身取决于它正在做什么,有时会在内部对其某些方法调用使用多处理。在它之上使用您自己的多处理不会给您带来任何好处。事实上,worker 函数很短,numpy 方法是用 C 语言编写的,执行速度很快,这是一个相当简单的 worker 函数的例子。

以下是在一个循环中进行 3 formating 调用的基准,该循环迭代 100 次并计算经过的时间,然后使用大小为 3 的多处理池和方法 Pool.map 然后再次使用方法 Pool.apply_async (对于这个例子,我希望最后两个多处理案例在 运行 时间内或多或少相同):

import multiprocessing as mp
import numpy as np
from functools import partial
import time

def formating(a, b):
    # Formating goes here
    x = np.sort(b);
    # digitize
    l = np.digitize(a, x)
    # output:
    result = np.bincount(l, weights=a)
    return result

# needed for Windows:
if __name__ == '__main__':
    Numbers = np.array([3, 4, 5, 7, 8, 10,20])
    limit1 = np.array([0, 2 , 5, 12, 15])
    limit2 = np.array([0, 2 , 5, 12])
    limit3 = np.array([0, 2 , 5, 12, 15, 22])

    TRIALS = 100

    # non-multiprocessing:
    t = time.time()
    for _ in range(TRIALS):
        result1 = formating(Numbers, limit1)
        result2 = formating(Numbers, limit2)
        result3 = formating(Numbers, limit3)
    elapsed = time.time() - t
    print(result1, result2, result3, elapsed)

    # multiprocessing version 1 (using method map):
    # since first argument to formating is always the same:
    worker = partial(formating, Numbers)
    t = time.time()
    for _ in range(TRIALS):
        with mp.Pool(3) as pool:
            result1, result2, result3 = pool.map(worker, [limit1, limit2, limit3])
    elapsed = time.time() - t
    print(result1, result2, result3, elapsed)

    # multiprocessing version 2 (using method apply_async)
    t = time.time()
    for _ in range(TRIALS):
        with mp.Pool(3) as pool:
            results = [pool.apply_async(formating, args=(Numbers, limit)) for limit in [limit1, limit2, limit3]]
            result1, result2, result3 = [result.get() for result in results]
    elapsed = time.time() - t
    print(result1, result2, result3, elapsed)

打印:

[ 0.  0.  7. 30.  0. 20.] [ 0.  0.  7. 30. 20.] [ 0.  0.  7. 30.  0. 20.] 0.00299835205078125
[ 0.  0.  7. 30.  0. 20.] [ 0.  0.  7. 30. 20.] [ 0.  0.  7. 30.  0. 20.] 27.002381324768066
[ 0.  0.  7. 30.  0. 20.] [ 0.  0.  7. 30. 20.] [ 0.  0.  7. 30.  0. 20.] 27.023000240325928

结果

多处理版本只慢了大约 9000 倍(使用 mapapply_async 没有区别)。

如果我从基准测试中去掉创建池的开销,事情就会大大改善:

import multiprocessing as mp
import numpy as np
from functools import partial
import time

def formating(a, b):
    # Formating goes here
    x = np.sort(b);
    # digitize
    l = np.digitize(a, x)
    # output:
    result = np.bincount(l, weights=a)
    return result

# needed for Windows:
if __name__ == '__main__':
    Numbers = np.array([3, 4, 5, 7, 8, 10,20])
    limit1 = np.array([0, 2 , 5, 12, 15])
    limit2 = np.array([0, 2 , 5, 12])
    limit3 = np.array([0, 2 , 5, 12, 15, 22])

    TRIALS = 100

    # multiprocessing version 1 (using method map):
    # since first argument to formating is always the same:
    worker = partial(formating, Numbers)
    with mp.Pool(3) as pool:
        t = time.time()
        for _ in range(TRIALS):
            result1, result2, result3 = pool.map(worker, [limit1, limit2, limit3])
        elapsed = time.time() - t
    print(result1, result2, result3, elapsed)

打印:

[ 0.  0.  7. 30.  0. 20.] [ 0.  0.  7. 30. 20.] [ 0.  0.  7. 30.  0. 20.] 0.32500314712524414

但与 0.003 秒相比,它仍然需要 0.325 秒。这只是向您展示了主要的开销在于池的创建——但是您仍然必须创建池并考虑该开销。

这是你如何做的,但在这种情况下不要这样做