通过并行化加速随机数生成

Speeding up random number generation by parallelizing

我需要使用来自标准正态分布的随机数创建许多大型 numpy 数组 (4e6, 100),我正在努力加快速度。我尝试使用多个内核生成阵列的不同部分,但没有获得预期的速度改进。是我做错了什么,还是我期望以这种方式提高速度是错误的?

from numpy.random import default_rng
from multiprocessing import Pool
from time import time


def rng_mp(rng):
    return rng.standard_normal((250000, 100))


if __name__ == '__main__':

    n_proc = 4
    rngs = [default_rng(n) for n in range(n_proc)]
    rng_all = default_rng(1)

    start = time()
    result = rng_all.standard_normal((int(1e6), 100))
    print(f'Single process: {time() - start:.3f} seconds')

    start = time()
    with Pool(processes=n_proc) as p:
        result = p.map_async(rng_mp, rngs).get()
    print(f'MP: {time() - start:.3f} seconds')

    # Single process: 1.114 seconds
    # MP: 2.634 seconds

这并不是对原始问题的回答——更多的是后续问题,我无法回答。

我重新整理了代码,想看看到底发生了什么。

from numpy.random import default_rng
from concurrent.futures import ProcessPoolExecutor
import time

NPROC = 4

def rng_mp(i):
    s = time.perf_counter()
    r = default_rng(i).standard_normal((250000, 100))
    e = time.perf_counter()
    print(f'Process {i} {e-s:.2f}s')
    return r


if __name__ == '__main__':
    start = time.perf_counter()
    with ProcessPoolExecutor() as executor:
        for fr in [executor.submit(rng_mp, i) for i in range(NPROC)]:
            s = time.perf_counter()
            fr.result()
            e = time.perf_counter()
            print(f'Result time {e-s:.2f}')
    end = time.perf_counter()
    print(f'Overall {end - start:.3f} seconds')

典型的输出如下:

进程 0 0.33s
进程 2 0.33s
进程 1 0.33s
进程 3 0.33s
结果时间 2.27
结果时间5.57
结果时间 0.00
结果时间 0.00
总计 7.999 秒

换句话说,ring_mp() 进程会及时执行。 但是 延迟似乎是在获取结果时,我只能猜测这与在子进程和主进程之间移动大量内存有关。 FWIW 在 macOS 12.0.1 上我是 运行 numpy 1.21.4 和 Python 3.9.8。我无法解释这个。

更新:根据@Booboo 的回答,我改为使用 ThreadPoolExecutor(无需其他更改),结果如下:

进程 3 0.34s
进程 1 0.35s
进程 0 0.35s
结果时间0.35
结果时间 0.00
进程 2 0.35s
结果时间 0.00
结果时间 0.00
总计 0.388 秒

我怀疑速度变慢的原因仅仅是因为您需要将大量数据从子进程的地址空间移回主进程。我还怀疑用于随机数生成的 C 语言实现 numpy 释放了全局解释器锁,并且使用多线程而不是多处理可以解决您的性能问题:

from numpy.random import default_rng
from multiprocessing.pool import ThreadPool
from time import time


def rng_mp(rng):
    return rng.standard_normal((250000, 100))


if __name__ == '__main__':

    n_proc = 4
    rngs = [default_rng(n) for n in range(n_proc)]
    rng_all = default_rng(1)

    start = time()
    result = rng_all.standard_normal((int(1e6), 100))
    print(f'Single process: {time() - start:.3f} seconds')

    start = time()
    with ThreadPool(processes=n_proc) as p:
        result = p.map_async(rng_mp, rngs).get()
    print(f'MT: {time() - start:.3f} seconds')

打印:

Single process: 1.210 seconds
MT: 0.413 seconds

我很感谢其他贡献者提出这个建议,但我找到了一种比其他建议更快的方法,因为它使用现有数组的填充而不是创建新数组。它是对 numpy 文档 here 的改编,针对二维数组进行了优化。

from numpy.random import default_rng, SeedSequence
import multiprocessing
import concurrent.futures
import numpy as np
from time import time


class MultithreadedRNG2D:
    def __init__(self, shape, seed=None, threads=None):
        if threads is None:
            threads = multiprocessing.cpu_count()
        self.threads = threads

        seq = SeedSequence(seed)
        self._random_generators = [default_rng(s)
                                   for s in seq.spawn(threads)]

        self.shape = shape
        self.executor = concurrent.futures.ThreadPoolExecutor(threads)
        self.values = np.empty(shape)
        self.steps = [(t * (shape[0] // threads), (t + 1) * (shape[0] // threads))
                      if t < (threads - 1)
                      else (t * (shape[0] // threads), shape[0])
                      for t in range(threads)]

    def fill(self):
        def _fill(random_state, out, firstrow, lastrow):
            random_state.standard_normal(out=out[firstrow:lastrow])

        futures = {}
        for i in range(self.threads):
            args = (_fill,
                    self._random_generators[i],
                    self.values,
                    self.steps[i][0],
                    self.steps[i][1])
            futures[self.executor.submit(*args)] = i
        concurrent.futures.wait(futures)

    def __del__(self):
        self.executor.shutdown(False)


mrng = MultithreadedRNG2D((int(1e6), 100), seed=1, threads=4)
start = time()
mrng.fill()
print(f'MT: {time() - start:.3f} seconds')

# MT: 0.336 seconds

我的另一个答案中的逻辑现在在一个包中实现 mtalg 设计用于使用多线程生成随机数。

from mtalg.random import MultithreadedRNG
mrng = MultithreadedRNG(seed=1, num_threads=4)
mrng.standard_normal(size=(int(1e6), 100))