使用 Python 的多处理计算来自一长输入行的整数总和

Using Python's multiprocessing to calculate the sum of integers from one long input line

我想将 Python 的多处理模块用于以下目的: 将输入行映射到整数列表并计算此列表的总和。

输入行最初是一个字符串,其中要求和的项目由空格分隔。

我试过的是这样的:

from itertools import imap

my_input = '1000000000 ' * int(1e6)
print sum(imap(int, my_input.split()))

这在我的机器上大约需要 600 毫秒,但我想通过多处理让它更快。

似乎瓶颈在映射部分,因为求和方法在应用于准备好的整数列表时非常快:

>>> int_list = [int(1e9)] * int(1e6)
>>> %time sum(int_list)
CPU times: user 7.38 ms, sys: 5 µs, total: 7.38 ms
Wall time: 7.4 ms
>>> 1000000000000000

我尝试应用 中的说明,但由于我对使用多处理还很陌生,所以我无法按照说明解决这个问题。

所以,这似乎大致归结为三个步骤:

  1. 做一个pool
  2. 在该池中的列表中映射 int()
  3. 对结果求和。

所以:

if __name__ == '__main__':
    import multiprocessing
    my_input = '1000000000 ' * int(1e6)
    string_list = my_input.split()
    # Pool over all CPUs
    int_list = multiprocessing.Pool().map(int, string_list)
    print sum(int_list)

尽可能使用发电机可能更节省时间:

if __name__ == '__main__':
    import multiprocessing
    import re
    my_input = '1000000000 ' * int(1e6)
    # use a regex iterator matching whitespace
    string_list = (x.group(0) for x in re.finditer(r'[^\s]+\s', my_input))
    # Pool over all CPUs
    int_list = multiprocessing.Pool().imap(int, string_list)
    print sum(int_list)

正则表达式可能会比 split 慢,但使用 re.finditer 应该允许 Pool 开始映射的速度与进行单个拆分一样快,并且使用 imap 而不是 map 应该为 sum 做类似的事情(允许它在数字可用时开始添加)。 re.finditer 想法归功于 this answer

多进程可能比单进程更有效率,也可能不会。与一次完成所有事情所获得的时间相比,您最终可能会浪费更多时间来制定新流程并将结果传回。如果您也尝试将添加项放入池中,情况也是如此。

在我正在测试的系统上,它有两个 CPU,我在大约半秒内得到 运行 的单进程解决方案,在大约 1 秒内得到非生成器多进程解决方案,并在 12-13 秒内生成生成器解决方案。

使用名为 forking 的 Unix 系统功能,您可以零开销从父进程读取(而非写入)数据。通常,您必须将数据复制过来,但在 Unix 中分叉一个进程可以让您避免这种情况。

使用这个,池中的作业可以访问整个输入字符串并提取它要处理的部分。然后它可以自行拆分和解析字符串的这一部分,并 return 其部分中整数的总和。

from multiprocessing import Pool, cpu_count
from time import time


def serial(data):
    return sum(map(int, data.split()))


def parallel(data):
    processes = cpu_count()

    with Pool(processes) as pool:
        args = zip(
            ["input_"] * processes, # name of global to access
            range(processes), # job number
            [processes] * processes # total number of jobs 
        )
        return sum(pool.map(job, args, chunksize=1))


def job(args):
    global_name, job_number, total_jobs = args
    data = globals()[global_name]
    chunk = get_chunk(data, job_number, total_jobs)

    return serial(chunk)


def get_chunk(string, job_number, total_jobs):
    """This function may mess up if the number of integers in each chunk is low (1-2).
    It also assumes there is only 1 space separating integers."""
    approx_chunk_size = len(string) // total_jobs

    # initial estimates
    start = approx_chunk_size * job_number
    end = start + approx_chunk_size

    if start and not string.startswith(" ", start - 1):
        # if string[start] is not beginning of a number, advance to start of next number
        start = string.index(" ", start) + 1

    if job_number == total_jobs:
        # last job
        end = None
    elif not string.startswith(" ", end - 1):
        # if string[end] is part of a number, then advance to end of number
        end = string.index(" ", end - 1)

    return string[start:end]


def timeit(func, *args, **kwargs):
    "Simple timing function"
    start = time()
    result = func(*args, **kwargs)
    end = time()
    print("{} took {} seconds".format(func.__name__, end - start))
    return result


if __name__ == "__main__":
#    from multiprocessing.dummy import Pool # uncomment this for testing

    input_ = "1000000000 " * int(1e6)

    actual = timeit(parallel, input_)
    expected = timeit(serial, input_)
    assert actual == expected