使用 Python 的多处理计算来自一长输入行的整数总和
Using Python's multiprocessing to calculate the sum of integers from one long input line
我想将 Python 的多处理模块用于以下目的:
将输入行映射到整数列表并计算此列表的总和。
输入行最初是一个字符串,其中要求和的项目由空格分隔。
我试过的是这样的:
from itertools import imap
my_input = '1000000000 ' * int(1e6)
print sum(imap(int, my_input.split()))
这在我的机器上大约需要 600 毫秒,但我想通过多处理让它更快。
似乎瓶颈在映射部分,因为求和方法在应用于准备好的整数列表时非常快:
>>> int_list = [int(1e9)] * int(1e6)
>>> %time sum(int_list)
CPU times: user 7.38 ms, sys: 5 µs, total: 7.38 ms
Wall time: 7.4 ms
>>> 1000000000000000
我尝试应用 中的说明,但由于我对使用多处理还很陌生,所以我无法按照说明解决这个问题。
所以,这似乎大致归结为三个步骤:
- 做一个pool
- 在该池中的列表中映射 int()
- 对结果求和。
所以:
if __name__ == '__main__':
import multiprocessing
my_input = '1000000000 ' * int(1e6)
string_list = my_input.split()
# Pool over all CPUs
int_list = multiprocessing.Pool().map(int, string_list)
print sum(int_list)
尽可能使用发电机可能更节省时间:
if __name__ == '__main__':
import multiprocessing
import re
my_input = '1000000000 ' * int(1e6)
# use a regex iterator matching whitespace
string_list = (x.group(0) for x in re.finditer(r'[^\s]+\s', my_input))
# Pool over all CPUs
int_list = multiprocessing.Pool().imap(int, string_list)
print sum(int_list)
正则表达式可能会比 split
慢,但使用 re.finditer
应该允许 Pool
开始映射的速度与进行单个拆分一样快,并且使用 imap
而不是 map
应该为 sum
做类似的事情(允许它在数字可用时开始添加)。 re.finditer
想法归功于 this answer。
多进程可能比单进程更有效率,也可能不会。与一次完成所有事情所获得的时间相比,您最终可能会浪费更多时间来制定新流程并将结果传回。如果您也尝试将添加项放入池中,情况也是如此。
在我正在测试的系统上,它有两个 CPU,我在大约半秒内得到 运行 的单进程解决方案,在大约 1 秒内得到非生成器多进程解决方案,并在 12-13 秒内生成生成器解决方案。
使用名为 forking 的 Unix 系统功能,您可以零开销从父进程读取(而非写入)数据。通常,您必须将数据复制过来,但在 Unix 中分叉一个进程可以让您避免这种情况。
使用这个,池中的作业可以访问整个输入字符串并提取它要处理的部分。然后它可以自行拆分和解析字符串的这一部分,并 return 其部分中整数的总和。
from multiprocessing import Pool, cpu_count
from time import time
def serial(data):
return sum(map(int, data.split()))
def parallel(data):
processes = cpu_count()
with Pool(processes) as pool:
args = zip(
["input_"] * processes, # name of global to access
range(processes), # job number
[processes] * processes # total number of jobs
)
return sum(pool.map(job, args, chunksize=1))
def job(args):
global_name, job_number, total_jobs = args
data = globals()[global_name]
chunk = get_chunk(data, job_number, total_jobs)
return serial(chunk)
def get_chunk(string, job_number, total_jobs):
"""This function may mess up if the number of integers in each chunk is low (1-2).
It also assumes there is only 1 space separating integers."""
approx_chunk_size = len(string) // total_jobs
# initial estimates
start = approx_chunk_size * job_number
end = start + approx_chunk_size
if start and not string.startswith(" ", start - 1):
# if string[start] is not beginning of a number, advance to start of next number
start = string.index(" ", start) + 1
if job_number == total_jobs:
# last job
end = None
elif not string.startswith(" ", end - 1):
# if string[end] is part of a number, then advance to end of number
end = string.index(" ", end - 1)
return string[start:end]
def timeit(func, *args, **kwargs):
"Simple timing function"
start = time()
result = func(*args, **kwargs)
end = time()
print("{} took {} seconds".format(func.__name__, end - start))
return result
if __name__ == "__main__":
# from multiprocessing.dummy import Pool # uncomment this for testing
input_ = "1000000000 " * int(1e6)
actual = timeit(parallel, input_)
expected = timeit(serial, input_)
assert actual == expected
我想将 Python 的多处理模块用于以下目的: 将输入行映射到整数列表并计算此列表的总和。
输入行最初是一个字符串,其中要求和的项目由空格分隔。
我试过的是这样的:
from itertools import imap
my_input = '1000000000 ' * int(1e6)
print sum(imap(int, my_input.split()))
这在我的机器上大约需要 600 毫秒,但我想通过多处理让它更快。
似乎瓶颈在映射部分,因为求和方法在应用于准备好的整数列表时非常快:
>>> int_list = [int(1e9)] * int(1e6)
>>> %time sum(int_list)
CPU times: user 7.38 ms, sys: 5 µs, total: 7.38 ms
Wall time: 7.4 ms
>>> 1000000000000000
我尝试应用
所以,这似乎大致归结为三个步骤:
- 做一个pool
- 在该池中的列表中映射 int()
- 对结果求和。
所以:
if __name__ == '__main__':
import multiprocessing
my_input = '1000000000 ' * int(1e6)
string_list = my_input.split()
# Pool over all CPUs
int_list = multiprocessing.Pool().map(int, string_list)
print sum(int_list)
尽可能使用发电机可能更节省时间:
if __name__ == '__main__':
import multiprocessing
import re
my_input = '1000000000 ' * int(1e6)
# use a regex iterator matching whitespace
string_list = (x.group(0) for x in re.finditer(r'[^\s]+\s', my_input))
# Pool over all CPUs
int_list = multiprocessing.Pool().imap(int, string_list)
print sum(int_list)
正则表达式可能会比 split
慢,但使用 re.finditer
应该允许 Pool
开始映射的速度与进行单个拆分一样快,并且使用 imap
而不是 map
应该为 sum
做类似的事情(允许它在数字可用时开始添加)。 re.finditer
想法归功于 this answer。
多进程可能比单进程更有效率,也可能不会。与一次完成所有事情所获得的时间相比,您最终可能会浪费更多时间来制定新流程并将结果传回。如果您也尝试将添加项放入池中,情况也是如此。
在我正在测试的系统上,它有两个 CPU,我在大约半秒内得到 运行 的单进程解决方案,在大约 1 秒内得到非生成器多进程解决方案,并在 12-13 秒内生成生成器解决方案。
使用名为 forking 的 Unix 系统功能,您可以零开销从父进程读取(而非写入)数据。通常,您必须将数据复制过来,但在 Unix 中分叉一个进程可以让您避免这种情况。
使用这个,池中的作业可以访问整个输入字符串并提取它要处理的部分。然后它可以自行拆分和解析字符串的这一部分,并 return 其部分中整数的总和。
from multiprocessing import Pool, cpu_count
from time import time
def serial(data):
return sum(map(int, data.split()))
def parallel(data):
processes = cpu_count()
with Pool(processes) as pool:
args = zip(
["input_"] * processes, # name of global to access
range(processes), # job number
[processes] * processes # total number of jobs
)
return sum(pool.map(job, args, chunksize=1))
def job(args):
global_name, job_number, total_jobs = args
data = globals()[global_name]
chunk = get_chunk(data, job_number, total_jobs)
return serial(chunk)
def get_chunk(string, job_number, total_jobs):
"""This function may mess up if the number of integers in each chunk is low (1-2).
It also assumes there is only 1 space separating integers."""
approx_chunk_size = len(string) // total_jobs
# initial estimates
start = approx_chunk_size * job_number
end = start + approx_chunk_size
if start and not string.startswith(" ", start - 1):
# if string[start] is not beginning of a number, advance to start of next number
start = string.index(" ", start) + 1
if job_number == total_jobs:
# last job
end = None
elif not string.startswith(" ", end - 1):
# if string[end] is part of a number, then advance to end of number
end = string.index(" ", end - 1)
return string[start:end]
def timeit(func, *args, **kwargs):
"Simple timing function"
start = time()
result = func(*args, **kwargs)
end = time()
print("{} took {} seconds".format(func.__name__, end - start))
return result
if __name__ == "__main__":
# from multiprocessing.dummy import Pool # uncomment this for testing
input_ = "1000000000 " * int(1e6)
actual = timeit(parallel, input_)
expected = timeit(serial, input_)
assert actual == expected