Python 多处理比常规慢。我该如何改进?
Python multiprocessing is slower than regular. How can I improve?
基本上有一个脚本来梳理 nodes/points 的数据集以删除那些重叠的数据集。实际的脚本更复杂,但我将其简化为一个简单的重叠检查,不做任何演示。
我尝试了几种锁、队列、池的变体,一次添加一个作业,而不是批量添加。一些最严重的违规者要慢几个数量级。最终我以最快的速度完成了它。
发送到各个进程的重叠检查算法:
def check_overlap(args):
tolerance = args['tolerance']
this_coords = args['this_coords']
that_coords = args['that_coords']
overlaps = False
distance_x = this_coords[0] - that_coords[0]
if distance_x <= tolerance:
distance_x = pow(distance_x, 2)
distance_y = this_coords[1] - that_coords[1]
if distance_y <= tolerance:
distance = pow(distance_x + pow(distance_y, 2), 0.5)
if distance <= tolerance:
overlaps = True
return overlaps
处理函数:
def process_coords(coords, num_processors=1, tolerance=1):
import multiprocessing as mp
import time
if num_processors > 1:
pool = mp.Pool(num_processors)
start = time.time()
print "Start script w/ multiprocessing"
else:
num_processors = 0
start = time.time()
print "Start script w/ standard processing"
total_overlap_count = 0
# outer loop through nodes
start_index = 0
last_index = len(coords) - 1
while start_index <= last_index:
# nature of the original problem means we can process all pairs of a single node at once, but not multiple, so batch jobs by outer loop
batch_jobs = []
# inner loop against all pairs for this node
start_index += 1
count_overlapping = 0
for i in range(start_index, last_index+1, 1):
if num_processors:
# add job
batch_jobs.append({
'tolerance': tolerance,
'this_coords': coords[start_index],
'that_coords': coords[i]
})
else:
# synchronous processing
this_coords = coords[start_index]
that_coords = coords[i]
distance_x = this_coords[0] - that_coords[0]
if distance_x <= tolerance:
distance_x = pow(distance_x, 2)
distance_y = this_coords[1] - that_coords[1]
if distance_y <= tolerance:
distance = pow(distance_x + pow(distance_y, 2), 0.5)
if distance <= tolerance:
count_overlapping += 1
if num_processors:
res = pool.map_async(check_overlap, batch_jobs)
results = res.get()
for r in results:
if r:
count_overlapping += 1
# stuff normally happens here to process nodes connected to this node
total_overlap_count += count_overlapping
print total_overlap_count
print " time: {0}".format(time.time() - start)
和测试功能:
from random import random
coords = []
num_coords = 1000
spread = 100.0
half_spread = 0.5*spread
for i in range(num_coords):
coords.append([
random()*spread-half_spread,
random()*spread-half_spread
])
process_coords(coords, 1)
process_coords(coords, 4)
不过,非多处理始终在不到 0.4 秒的时间内运行,而多处理我可以获得不到 3.0 秒,因为它高于上面。我知道这里的算法可能太简单了,无法真正获得好处,但考虑到上述案例有将近 50 万次迭代,而实际案例有更多,多处理速度慢了一个数量级,这对我来说很奇怪。
我缺少什么/我可以做些什么来改进?
构建 O(N**2)
序列化代码中未使用的 3 元素字典,并通过进程间管道传输它们,是保证多处理无济于事的好方法 ;-) 没有免费的东西 - 一切成本。
下面是一个重写,它执行许多 相同的 代码,无论它是 运行 串行模式还是多处理模式。没有新的字典等。一般来说,len(coords)
越大,它从多处理中获得的好处就越多。在我的盒子上,在 20000 时,多处理 运行 大约需要挂钟时间的三分之一。
关键是所有进程都有自己的 coords
副本。这是在下面通过在创建池时仅传输一次来完成的。这应该适用于所有平台。在 Linux-y 系统上,它可能会 "by magic" 通过分叉进程继承发生。将跨进程发送的数据量从 O(N**2)
减少到 O(N)
是一个巨大的进步。
从多处理中获取更多信息需要更好的负载平衡。照原样,对 check_overlap(i)
的调用会将 coords[i]
与 coords[i+1:]
中的每个值进行比较。 i
越大,它要做的工作就越少,并且对于 i
的最大值,仅在进程之间传输 i
的成本 - 以及将结果传回 - 淹没了在 check_overlap(i)
.
中花费的时间
def init(*args):
global _coords, _tolerance
_coords, _tolerance = args
def check_overlap(start_index):
coords, tolerance = _coords, _tolerance
tsq = tolerance ** 2
overlaps = 0
start0, start1 = coords[start_index]
for i in range(start_index + 1, len(coords)):
that0, that1 = coords[i]
dx = abs(that0 - start0)
if dx <= tolerance:
dy = abs(that1 - start1)
if dy <= tolerance:
if dx**2 + dy**2 <= tsq:
overlaps += 1
return overlaps
def process_coords(coords, num_processors=1, tolerance=1):
global _coords, _tolerance
import multiprocessing as mp
_coords, _tolerance = coords, tolerance
import time
if num_processors > 1:
pool = mp.Pool(num_processors, initializer=init, initargs=(coords, tolerance))
start = time.time()
print("Start script w/ multiprocessing")
else:
num_processors = 0
start = time.time()
print("Start script w/ standard processing")
N = len(coords)
if num_processors:
total_overlap_count = sum(pool.imap_unordered(check_overlap, range(N)))
else:
total_overlap_count = sum(check_overlap(i) for i in range(N))
print(total_overlap_count)
print(" time: {0}".format(time.time() - start))
if __name__ == "__main__":
from random import random
coords = []
num_coords = 20000
spread = 100.0
half_spread = 0.5*spread
for i in range(num_coords):
coords.append([
random()*spread-half_spread,
random()*spread-half_spread
])
process_coords(coords, 1)
process_coords(coords, 4)
基本上有一个脚本来梳理 nodes/points 的数据集以删除那些重叠的数据集。实际的脚本更复杂,但我将其简化为一个简单的重叠检查,不做任何演示。
我尝试了几种锁、队列、池的变体,一次添加一个作业,而不是批量添加。一些最严重的违规者要慢几个数量级。最终我以最快的速度完成了它。
发送到各个进程的重叠检查算法:
def check_overlap(args):
tolerance = args['tolerance']
this_coords = args['this_coords']
that_coords = args['that_coords']
overlaps = False
distance_x = this_coords[0] - that_coords[0]
if distance_x <= tolerance:
distance_x = pow(distance_x, 2)
distance_y = this_coords[1] - that_coords[1]
if distance_y <= tolerance:
distance = pow(distance_x + pow(distance_y, 2), 0.5)
if distance <= tolerance:
overlaps = True
return overlaps
处理函数:
def process_coords(coords, num_processors=1, tolerance=1):
import multiprocessing as mp
import time
if num_processors > 1:
pool = mp.Pool(num_processors)
start = time.time()
print "Start script w/ multiprocessing"
else:
num_processors = 0
start = time.time()
print "Start script w/ standard processing"
total_overlap_count = 0
# outer loop through nodes
start_index = 0
last_index = len(coords) - 1
while start_index <= last_index:
# nature of the original problem means we can process all pairs of a single node at once, but not multiple, so batch jobs by outer loop
batch_jobs = []
# inner loop against all pairs for this node
start_index += 1
count_overlapping = 0
for i in range(start_index, last_index+1, 1):
if num_processors:
# add job
batch_jobs.append({
'tolerance': tolerance,
'this_coords': coords[start_index],
'that_coords': coords[i]
})
else:
# synchronous processing
this_coords = coords[start_index]
that_coords = coords[i]
distance_x = this_coords[0] - that_coords[0]
if distance_x <= tolerance:
distance_x = pow(distance_x, 2)
distance_y = this_coords[1] - that_coords[1]
if distance_y <= tolerance:
distance = pow(distance_x + pow(distance_y, 2), 0.5)
if distance <= tolerance:
count_overlapping += 1
if num_processors:
res = pool.map_async(check_overlap, batch_jobs)
results = res.get()
for r in results:
if r:
count_overlapping += 1
# stuff normally happens here to process nodes connected to this node
total_overlap_count += count_overlapping
print total_overlap_count
print " time: {0}".format(time.time() - start)
和测试功能:
from random import random
coords = []
num_coords = 1000
spread = 100.0
half_spread = 0.5*spread
for i in range(num_coords):
coords.append([
random()*spread-half_spread,
random()*spread-half_spread
])
process_coords(coords, 1)
process_coords(coords, 4)
不过,非多处理始终在不到 0.4 秒的时间内运行,而多处理我可以获得不到 3.0 秒,因为它高于上面。我知道这里的算法可能太简单了,无法真正获得好处,但考虑到上述案例有将近 50 万次迭代,而实际案例有更多,多处理速度慢了一个数量级,这对我来说很奇怪。
我缺少什么/我可以做些什么来改进?
构建 O(N**2)
序列化代码中未使用的 3 元素字典,并通过进程间管道传输它们,是保证多处理无济于事的好方法 ;-) 没有免费的东西 - 一切成本。
下面是一个重写,它执行许多 相同的 代码,无论它是 运行 串行模式还是多处理模式。没有新的字典等。一般来说,len(coords)
越大,它从多处理中获得的好处就越多。在我的盒子上,在 20000 时,多处理 运行 大约需要挂钟时间的三分之一。
关键是所有进程都有自己的 coords
副本。这是在下面通过在创建池时仅传输一次来完成的。这应该适用于所有平台。在 Linux-y 系统上,它可能会 "by magic" 通过分叉进程继承发生。将跨进程发送的数据量从 O(N**2)
减少到 O(N)
是一个巨大的进步。
从多处理中获取更多信息需要更好的负载平衡。照原样,对 check_overlap(i)
的调用会将 coords[i]
与 coords[i+1:]
中的每个值进行比较。 i
越大,它要做的工作就越少,并且对于 i
的最大值,仅在进程之间传输 i
的成本 - 以及将结果传回 - 淹没了在 check_overlap(i)
.
def init(*args):
global _coords, _tolerance
_coords, _tolerance = args
def check_overlap(start_index):
coords, tolerance = _coords, _tolerance
tsq = tolerance ** 2
overlaps = 0
start0, start1 = coords[start_index]
for i in range(start_index + 1, len(coords)):
that0, that1 = coords[i]
dx = abs(that0 - start0)
if dx <= tolerance:
dy = abs(that1 - start1)
if dy <= tolerance:
if dx**2 + dy**2 <= tsq:
overlaps += 1
return overlaps
def process_coords(coords, num_processors=1, tolerance=1):
global _coords, _tolerance
import multiprocessing as mp
_coords, _tolerance = coords, tolerance
import time
if num_processors > 1:
pool = mp.Pool(num_processors, initializer=init, initargs=(coords, tolerance))
start = time.time()
print("Start script w/ multiprocessing")
else:
num_processors = 0
start = time.time()
print("Start script w/ standard processing")
N = len(coords)
if num_processors:
total_overlap_count = sum(pool.imap_unordered(check_overlap, range(N)))
else:
total_overlap_count = sum(check_overlap(i) for i in range(N))
print(total_overlap_count)
print(" time: {0}".format(time.time() - start))
if __name__ == "__main__":
from random import random
coords = []
num_coords = 20000
spread = 100.0
half_spread = 0.5*spread
for i in range(num_coords):
coords.append([
random()*spread-half_spread,
random()*spread-half_spread
])
process_coords(coords, 1)
process_coords(coords, 4)