如何在 Python 中使用多处理对循环进行并行求和

How to parallel sum a loop using multiprocessing in Python

我很难理解如何使用 Python 的多处理模块。

我有一个从1n的总和,其中n=10^10太大了,无法放入列表中,这似乎是许多使用多处理的在线示例的主旨.

有没有办法"split up"将范围分成一定大小的段,然后对每个段求和?

例如

def sum_nums(low,high):
    result = 0
    for i in range(low,high+1):
        result += i
    return result

我想通过将它分解成许多 sum_nums(1,1000) + sum_nums(1001,2000) + sum_nums(2001,3000)... 来计算 sum_nums(1,10**10) 等等。我知道有一个封闭形式 n(n+1)/2 但假装我们不知道。

这是我试过的方法

import multiprocessing

def sum_nums(low,high):
    result = 0
    for i in range(low,high+1):
        result += i
    return result

if __name__ == "__main__":
    n = 1000 
    procs = 2 

    sizeSegment = n/procs

    jobs = []
    for i in range(0, procs):
        process = multiprocessing.Process(target=sum_nums, args=(i*sizeSegment+1, (i+1)*sizeSegment))
        jobs.append(process)

    for j in jobs:
        j.start()
    for j in jobs:
        j.join()

    #where is the result?

首先,解决内存问题的最佳方法是使用 iterator/generator 而不是列表:

def sum_nums(low, high):
    result = 0
    for i in xrange(low, high+1):
        result += 1
    return result

在python3中,range()产生一个迭代器,所以这只在python2

中需要

现在,当您想要将处理拆分到不同的进程或 CPU 核心时,多处理就派上用场了。如果您不需要控制单个工作人员,那么最简单的方法就是使用进程池。这将使您可以将函数映射到池并获得输出。您也可以使用 apply_async 一次将一个作业应用到池中并获得延迟结果,您可以使用 .get():

import multiprocessing
from multiprocessing import Pool
from time import time

def sum_nums(low, high):
    result = 0
    for i in xrange(low, high+1):
        result += i
    return result

# map requires a function to handle a single argument
def sn((low,high)):
    return sum_nums(low, high) 

if __name__ == '__main__': 
    #t = time()
    # takes forever   
    #print sum_nums(1,10**10)
    #print '{} s'.format(time() -t)
    p = Pool(4)

    n = int(1e8)
    r = range(0,10**10+1,n)
    results = []

    # using apply_async
    t = time()
    for arg in zip([x+1 for x in r],r[1:]):
        results.append(p.apply_async(sum_nums, arg))

    # wait for results
    print sum(res.get() for res in results)
    print '{} s'.format(time() -t)

    # using process pool
    t = time()
    print sum(p.map(sn, zip([x+1 for x in r], r[1:])))
    print '{} s'.format(time() -t)

在我的机器上,仅使用 10**10 调用 sum_nums 需要将近 9 分钟,但使用 Pool(8)n=int(1e8) 可将其缩短到一分钟多一点。

我发现 multiprocess.Pool 和 map() 的用法更简单

使用您的代码:

from multiprocessing import Pool

def sum_nums(args):
    low = int(args[0])
    high = int(args[1])
    return sum(range(low,high+1))

if __name__ == "__main__":
    n = 1000 
    procs = 2 

    sizeSegment = n/procs

    # Create size segments list
    jobs = []
    for i in range(0, procs):
        jobs.append((i*sizeSegment+1, (i+1)*sizeSegment))

    pool = Pool(procs).map(sum_nums, jobs)
    result = sum(pool)

    >>> print result
    >>> 500500

您完全可以在没有 multiprocessing 的情况下进行此求和,并且仅使用生成器可能更简单,甚至更快。

# prepare a generator of generators each at 1000 point intervals
>>> xr = (xrange(1000*i+1,i*1000+1001) for i in xrange(10000000))
>>> list(xr)[:3]
[xrange(1, 1001), xrange(1001, 2001), xrange(2001, 3001)]
# sum, using two map functions
>>> xr = (xrange(1000*i+1,i*1000+1001) for i in xrange(10000000))
>>> sum(map(sum, map(lambda x:x, xr)))
50000000005000000000L

不过,如果你想使用multiprocessing,你也可以这样做。我使用的是 multiprocessing 的一个分支,它更擅长序列化(但除此之外,并没有什么不同)。

>>> xr = (xrange(1000*i+1,i*1000+1001) for i in xrange(10000000))
>>> import pathos
>>> mmap = pathos.multiprocessing.ProcessingPool().map
>>> tmap = pathos.multiprocessing.ThreadingPool().map
>>> sum(tmap(sum, mmap(lambda x:x, xr)))
50000000005000000000L

版本 w/o multiprocessing 速度更快,在我的笔记本电脑上大约需要一分钟。由于生成多个 python 进程的开销,multiprocessing 版本需要几分钟时间。

如果您有兴趣,请在此处获取 pathoshttps://github.com/uqfoundation