python 中嵌套循环的性能问题
Performance issue in python with nested loop
由于点积,我能够使用 numpy 改进用 python 编写的代码。现在我还有一部分代码仍然很慢。我仍然不了解多线程,如果这对这里有帮助的话。在我看来,这应该是可能的。你知道在这里做什么好吗?
for x1 in range(a**l):
for x2 in range(a**l):
for x3 in range(a**l):
f11 = 0
cv1 = numpy.ndarray.sum(
numpy.absolute(numpy.subtract(ws[x1], ws[x2])))
cv2 = numpy.ndarray.sum(
numpy.absolute(numpy.subtract(ws[x1], ws[x3])))
if cv1 == 0:
f11 += 1
if cv2 == 0:
f11 += 1
re[x1][x2][x3] = 1.0*r/(a**l-2)*(numpy.product(numpy.absolute(
numpy.subtract((2*ws[x1]+ws[x2]+ws[x3]), 2)))-f11)
f11 *= 1.0*(1-r)/2
re[x1][x2][x3] += f11
在担心多处理之前,请尝试使用普通 numpy 提供的功能。
首先,确保 ws
是 numpy 数组,而不是此类数组的列表。那么
cv1 = numpy.ndarray.sum(numpy.absolute(numpy.subtract(ws[x1], ws[x2])))
if cv1 == 0:
f11 += 1
变成f11 = np.nonzero(ws[x1] == ws[x2])
.
对其余代码执行相同的操作,您将能够看到更多结构:np.product
就是 *
等等。
然后,re[x1][x2][x3]
不是您通常索引 numpy 数组的方式,请使用 re[x1, x2, x3]
。仅此一项就可以为您节省大量时间和内存分配。
一旦完成,看看你是否真的可以向量化表达式,即使用 numpy 全数组操作而不是普通的 python 循环(你很可能可以,但是用代码片段的当前状态)。
我试图重新创建问题感兴趣的条件,但首先是一个较小的测试用例来说明策略。先说作者的原实现:
import numpy as np
import numba as nb
import numpy
def func(re, ws, a, l, r):
for x1 in range(a**l):
for x2 in range(a**l):
for x3 in range(a**l):
f11 = 0
cv1 = numpy.ndarray.sum(
numpy.absolute(numpy.subtract(ws[x1], ws[x2])))
cv2 = numpy.ndarray.sum(
numpy.absolute(numpy.subtract(ws[x1], ws[x3])))
if cv1 == 0:
f11 += 1
if cv2 == 0:
f11 += 1
re[x1][x2][x3] = 1.0*r/(a**l-2)*(numpy.product(numpy.absolute(
numpy.subtract((2*ws[x1]+ws[x2]+ws[x3]), 2)))-f11)
f11 *= 1.0*(1-r)/2
re[x1][x2][x3] += f11
现在简单翻译成Numba,当你处理numpy数组和数值计算时,它非常适合这些类型的深度嵌套循环问题:
@nb.njit
def func2(re, ws, a, l, r):
for x1 in range(a**l):
for x2 in range(a**l):
for x3 in range(a**l):
f11 = 0.0
cv1 = np.sum(np.abs(ws[x1] - ws[x2]))
cv2 = np.sum(np.abs(ws[x1] - ws[x3]))
if cv1 == 0:
f11 += 1
if cv2 == 0:
f11 += 1
y = np.prod(np.abs(2*ws[x1]+ws[x2]+ws[x3] - 2)) - f11
re[x1,x2,x3] = 1.0*r/(a**l-2)*y
f11 *= 1.0*(1-r)/2
re[x1,x2,x3] += f11
然后进行一些进一步的优化以摆脱临时数组的创建:
@nb.njit
def func3(re, ws, a, l, r):
for x1 in range(a**l):
for x2 in range(a**l):
for x3 in range(a**l):
f11 = 0.0
cv1 = 0.0
cv2 = 0.0
for i in range(ws.shape[1]):
cv1 += np.abs(ws[x1,i] - ws[x2,i])
cv2 += np.abs(ws[x1,i] - ws[x3,i])
if cv1 == 0:
f11 += 1
if cv2 == 0:
f11 += 1
y = 1.0
for i in range(ws.shape[1]):
y *= np.abs(2.0*ws[x1,i] + ws[x2,i] + ws[x3,i] - 2)
y -= f11
re[x1,x2,x3] = 1.0*r/(a**l-2)*y
f11 *= 1.0*(1-r)/2
re[x1,x2,x3] += f11
所以一些简单的测试数据:
a = 2
l = 5
r = 0.2
wp = (numpy.arange(2**l)[:,None] >> numpy.arange(l)[::-1]) & 1
wp = numpy.hstack([wp.sum(1,keepdims=True), wp])
ws = wp[:, 3:l+3]
re = numpy.zeros((a**l, a**l, a**l))
现在让我们检查所有三个函数是否产生相同的结果:
re = numpy.zeros((a**l, a**l, a**l))
func(re, ws, a, l, r)
re2 = numpy.zeros((a**l, a**l, a**l))
func2(re2, ws, a, l, r)
re3 = numpy.zeros((a**l, a**l, a**l))
func3(re3, ws, a, l, r)
print np.allclose(re, re2) # True
print np.allclose(re, re3) # True
以及一些使用 jupyter notebook 的初始计时 %timeit
魔法:
%timeit func(re, ws, a, l, r)
%timeit func2(re2, ws, a, l, r)
%timeit func3(re3, ws, a, l, r)
1 loop, best of 3: 404 ms per loop
100 loops, best of 3: 14.2 ms per loop
1000 loops, best of 3: 605 µs per loop
func2
比原始实现快约 28 倍。 func3
快了约 680 倍。请注意,我 运行 使用的是配备 i7 处理器、16 GB RAM 并使用 Numba 0.25.0 的 Macbook 笔记本电脑。
好的,现在让我们为每个人都在绞尽脑汁的 a=2
l=10
案例计时:
a = 2
l = 10
r = 0.2
wp = (numpy.arange(2**l)[:,None] >> numpy.arange(l)[::-1]) & 1
wp = numpy.hstack([wp.sum(1,keepdims=True), wp])
ws = wp[:, 3:l+3]
re = numpy.zeros((a**l, a**l, a**l))
print 'setup complete'
%timeit -n 1 -r 1 func3(re, ws, a, l, r)
# setup complete
# 1 loop, best of 1: 45.4 s per loop
所以这在我的单线程机器上花费了 45 秒,如果你不这样做的话,这似乎是合理的然后计算太多次。
由于点积,我能够使用 numpy 改进用 python 编写的代码。现在我还有一部分代码仍然很慢。我仍然不了解多线程,如果这对这里有帮助的话。在我看来,这应该是可能的。你知道在这里做什么好吗?
for x1 in range(a**l):
for x2 in range(a**l):
for x3 in range(a**l):
f11 = 0
cv1 = numpy.ndarray.sum(
numpy.absolute(numpy.subtract(ws[x1], ws[x2])))
cv2 = numpy.ndarray.sum(
numpy.absolute(numpy.subtract(ws[x1], ws[x3])))
if cv1 == 0:
f11 += 1
if cv2 == 0:
f11 += 1
re[x1][x2][x3] = 1.0*r/(a**l-2)*(numpy.product(numpy.absolute(
numpy.subtract((2*ws[x1]+ws[x2]+ws[x3]), 2)))-f11)
f11 *= 1.0*(1-r)/2
re[x1][x2][x3] += f11
在担心多处理之前,请尝试使用普通 numpy 提供的功能。
首先,确保 ws
是 numpy 数组,而不是此类数组的列表。那么
cv1 = numpy.ndarray.sum(numpy.absolute(numpy.subtract(ws[x1], ws[x2])))
if cv1 == 0:
f11 += 1
变成f11 = np.nonzero(ws[x1] == ws[x2])
.
对其余代码执行相同的操作,您将能够看到更多结构:np.product
就是 *
等等。
然后,re[x1][x2][x3]
不是您通常索引 numpy 数组的方式,请使用 re[x1, x2, x3]
。仅此一项就可以为您节省大量时间和内存分配。
一旦完成,看看你是否真的可以向量化表达式,即使用 numpy 全数组操作而不是普通的 python 循环(你很可能可以,但是用代码片段的当前状态)。
我试图重新创建问题感兴趣的条件,但首先是一个较小的测试用例来说明策略。先说作者的原实现:
import numpy as np
import numba as nb
import numpy
def func(re, ws, a, l, r):
for x1 in range(a**l):
for x2 in range(a**l):
for x3 in range(a**l):
f11 = 0
cv1 = numpy.ndarray.sum(
numpy.absolute(numpy.subtract(ws[x1], ws[x2])))
cv2 = numpy.ndarray.sum(
numpy.absolute(numpy.subtract(ws[x1], ws[x3])))
if cv1 == 0:
f11 += 1
if cv2 == 0:
f11 += 1
re[x1][x2][x3] = 1.0*r/(a**l-2)*(numpy.product(numpy.absolute(
numpy.subtract((2*ws[x1]+ws[x2]+ws[x3]), 2)))-f11)
f11 *= 1.0*(1-r)/2
re[x1][x2][x3] += f11
现在简单翻译成Numba,当你处理numpy数组和数值计算时,它非常适合这些类型的深度嵌套循环问题:
@nb.njit
def func2(re, ws, a, l, r):
for x1 in range(a**l):
for x2 in range(a**l):
for x3 in range(a**l):
f11 = 0.0
cv1 = np.sum(np.abs(ws[x1] - ws[x2]))
cv2 = np.sum(np.abs(ws[x1] - ws[x3]))
if cv1 == 0:
f11 += 1
if cv2 == 0:
f11 += 1
y = np.prod(np.abs(2*ws[x1]+ws[x2]+ws[x3] - 2)) - f11
re[x1,x2,x3] = 1.0*r/(a**l-2)*y
f11 *= 1.0*(1-r)/2
re[x1,x2,x3] += f11
然后进行一些进一步的优化以摆脱临时数组的创建:
@nb.njit
def func3(re, ws, a, l, r):
for x1 in range(a**l):
for x2 in range(a**l):
for x3 in range(a**l):
f11 = 0.0
cv1 = 0.0
cv2 = 0.0
for i in range(ws.shape[1]):
cv1 += np.abs(ws[x1,i] - ws[x2,i])
cv2 += np.abs(ws[x1,i] - ws[x3,i])
if cv1 == 0:
f11 += 1
if cv2 == 0:
f11 += 1
y = 1.0
for i in range(ws.shape[1]):
y *= np.abs(2.0*ws[x1,i] + ws[x2,i] + ws[x3,i] - 2)
y -= f11
re[x1,x2,x3] = 1.0*r/(a**l-2)*y
f11 *= 1.0*(1-r)/2
re[x1,x2,x3] += f11
所以一些简单的测试数据:
a = 2
l = 5
r = 0.2
wp = (numpy.arange(2**l)[:,None] >> numpy.arange(l)[::-1]) & 1
wp = numpy.hstack([wp.sum(1,keepdims=True), wp])
ws = wp[:, 3:l+3]
re = numpy.zeros((a**l, a**l, a**l))
现在让我们检查所有三个函数是否产生相同的结果:
re = numpy.zeros((a**l, a**l, a**l))
func(re, ws, a, l, r)
re2 = numpy.zeros((a**l, a**l, a**l))
func2(re2, ws, a, l, r)
re3 = numpy.zeros((a**l, a**l, a**l))
func3(re3, ws, a, l, r)
print np.allclose(re, re2) # True
print np.allclose(re, re3) # True
以及一些使用 jupyter notebook 的初始计时 %timeit
魔法:
%timeit func(re, ws, a, l, r)
%timeit func2(re2, ws, a, l, r)
%timeit func3(re3, ws, a, l, r)
1 loop, best of 3: 404 ms per loop
100 loops, best of 3: 14.2 ms per loop
1000 loops, best of 3: 605 µs per loop
func2
比原始实现快约 28 倍。 func3
快了约 680 倍。请注意,我 运行 使用的是配备 i7 处理器、16 GB RAM 并使用 Numba 0.25.0 的 Macbook 笔记本电脑。
好的,现在让我们为每个人都在绞尽脑汁的 a=2
l=10
案例计时:
a = 2
l = 10
r = 0.2
wp = (numpy.arange(2**l)[:,None] >> numpy.arange(l)[::-1]) & 1
wp = numpy.hstack([wp.sum(1,keepdims=True), wp])
ws = wp[:, 3:l+3]
re = numpy.zeros((a**l, a**l, a**l))
print 'setup complete'
%timeit -n 1 -r 1 func3(re, ws, a, l, r)
# setup complete
# 1 loop, best of 1: 45.4 s per loop
所以这在我的单线程机器上花费了 45 秒,如果你不这样做的话,这似乎是合理的然后计算太多次。