如何在numpy中向量化傅立叶级数部分和
How to vectorize fourier series partial sum in numpy
给定周期为 T
和 t
的函数的傅里叶级数系数 a[n]
和 b[n]
(分别针对余弦和正弦),以下等距间隔代码将评估区间 t
中所有点的部分和(a
、b
、t
都是 numpy
数组)。明确了 len(t) <> len(a).
yn=ones(len(t))*a[0]
for n in range(1,len(a)):
yn=yn+(a[n]*cos(2*pi*n*t/T)-b[n]*sin(2*pi*n*t/T))
我的问题是:这个for循环可以向量化吗?
这是一种矢量化方法,将 broadcasting
to create the 2D
array version of cosine/sine input : 2*pi*n*t/T
and then using matrix-multiplication
with np.dot
用于 sum-reduction
-
r = np.arange(1,len(a))
S = 2*np.pi*r[:,None]*t/T
cS = np.cos(S)
sS = np.sin(S)
out = a[1:].dot(cS) - b[1:].dot(sS) + a[0]
性能进一步提升
为了进一步提升,我们可以利用 numexpr
module 来计算那些三角步 -
import numexpr as ne
cS = ne.evaluate('cos(S)')
sS = ne.evaluate('sin(S)')
运行时测试-
接近 -
def original_app(t,a,b,T):
yn=np.ones(len(t))*a[0]
for n in range(1,len(a)):
yn=yn+(a[n]*np.cos(2*np.pi*n*t/T)-b[n]*np.sin(2*np.pi*n*t/T))
return yn
def vectorized_app(t,a,b,T):
r = np.arange(1,len(a))
S = (2*np.pi/T)*r[:,None]*t
cS = np.cos(S)
sS = np.sin(S)
return a[1:].dot(cS) - b[1:].dot(sS) + a[0]
def vectorized_app_v2(t,a,b,T):
r = np.arange(1,len(a))
S = (2*np.pi/T)*r[:,None]*t
cS = ne.evaluate('cos(S)')
sS = ne.evaluate('sin(S)')
return a[1:].dot(cS) - b[1:].dot(sS) + a[0]
另外,包括来自@Paul Panzer 的 post.
的函数 PP
计时 -
In [22]: # Setup inputs
...: n = 10000
...: t = np.random.randint(0,9,(n))
...: a = np.random.randint(0,9,(n))
...: b = np.random.randint(0,9,(n))
...: T = 3.45
...:
In [23]: print np.allclose(original_app(t,a,b,T), vectorized_app(t,a,b,T))
...: print np.allclose(original_app(t,a,b,T), vectorized_app_v2(t,a,b,T))
...: print np.allclose(original_app(t,a,b,T), PP(t,a,b,T))
...:
True
True
True
In [25]: %timeit original_app(t,a,b,T)
...: %timeit vectorized_app(t,a,b,T)
...: %timeit vectorized_app_v2(t,a,b,T)
...: %timeit PP(t,a,b,T)
...:
1 loops, best of 3: 6.49 s per loop
1 loops, best of 3: 6.24 s per loop
1 loops, best of 3: 1.54 s per loop
1 loops, best of 3: 1.96 s per loop
无法击败 numexpr,但如果它不可用,我们可以节省先验数(测试和基准测试代码主要基于 @Divakar 的代码,以防您没有注意到 ;-)):
import numpy as np
from timeit import timeit
def PP(t,a,b,T):
CS = np.empty((len(t), len(a)-1), np.complex)
CS[...] = np.exp(2j*np.pi*(t[:, None])/T)
np.cumprod(CS, axis=-1, out=CS)
return a[1:].dot(CS.T.real) - b[1:].dot(CS.T.imag) + a[0]
def original_app(t,a,b,T):
yn=np.ones(len(t))*a[0]
for n in range(1,len(a)):
yn=yn+(a[n]*np.cos(2*np.pi*n*t/T)-b[n]*np.sin(2*np.pi*n*t/T))
return yn
def vectorized_app(t,a,b,T):
r = np.arange(1,len(a))
S = 2*np.pi*r[:,None]*t/T
cS = np.cos(S)
sS = np.sin(S)
return a[1:].dot(cS) - b[1:].dot(sS) + a[0]
n = 1000
t = 2000
t = np.random.randint(0,9,(t))
a = np.random.randint(0,9,(n))
b = np.random.randint(0,9,(n))
T = 3.45
print(np.allclose(original_app(t,a,b,T), vectorized_app(t,a,b,T)))
print(np.allclose(original_app(t,a,b,T), PP(t,a,b,T)))
print('{:18s} {:9.6f}'.format('orig', timeit(lambda: original_app(t,a,b,T), number=10)/10))
print('{:18s} {:9.6f}'.format('Divakar no numexpr', timeit(lambda: vectorized_app(t,a,b,T), number=10)/10))
print('{:18s} {:9.6f}'.format('PP', timeit(lambda: PP(t,a,b,T), number=10)/10))
打印:
True
True
orig 0.166903
Divakar no numexpr 0.179617
PP 0.060817
顺便说一句。如果 delta t
除 T
一个人可以节省更多,甚至 运行 完整的 fft 并丢弃太多的东西。
这不是真正的另一个答案,而是对@Paul Panzer 的评论,写成答案是因为我需要 post 一些代码。如果有办法在评论中post正确格式化代码,请指教。
受到@Paul Panzer cumprod
想法的启发,我想出了以下内容:
an = ones((len(a)-1,len(te)))*2j*pi*te/T
CS = exp(cumsum(an,axis=0))
out = (a[1:].dot(CS.real) - b[1:].dot(CS.imag)) + a[0]
虽然它似乎被适当地矢量化并产生了正确的结果,但它的性能很糟糕。它不仅比 cumprod
慢得多,预计会进行更多的 len(a)-1
指数运算,而且比原始的非矢量化版本慢 50%。性能不佳的原因是什么?
给定周期为 T
和 t
的函数的傅里叶级数系数 a[n]
和 b[n]
(分别针对余弦和正弦),以下等距间隔代码将评估区间 t
中所有点的部分和(a
、b
、t
都是 numpy
数组)。明确了 len(t) <> len(a).
yn=ones(len(t))*a[0]
for n in range(1,len(a)):
yn=yn+(a[n]*cos(2*pi*n*t/T)-b[n]*sin(2*pi*n*t/T))
我的问题是:这个for循环可以向量化吗?
这是一种矢量化方法,将 broadcasting
to create the 2D
array version of cosine/sine input : 2*pi*n*t/T
and then using matrix-multiplication
with np.dot
用于 sum-reduction
-
r = np.arange(1,len(a))
S = 2*np.pi*r[:,None]*t/T
cS = np.cos(S)
sS = np.sin(S)
out = a[1:].dot(cS) - b[1:].dot(sS) + a[0]
性能进一步提升
为了进一步提升,我们可以利用 numexpr
module 来计算那些三角步 -
import numexpr as ne
cS = ne.evaluate('cos(S)')
sS = ne.evaluate('sin(S)')
运行时测试-
接近 -
def original_app(t,a,b,T):
yn=np.ones(len(t))*a[0]
for n in range(1,len(a)):
yn=yn+(a[n]*np.cos(2*np.pi*n*t/T)-b[n]*np.sin(2*np.pi*n*t/T))
return yn
def vectorized_app(t,a,b,T):
r = np.arange(1,len(a))
S = (2*np.pi/T)*r[:,None]*t
cS = np.cos(S)
sS = np.sin(S)
return a[1:].dot(cS) - b[1:].dot(sS) + a[0]
def vectorized_app_v2(t,a,b,T):
r = np.arange(1,len(a))
S = (2*np.pi/T)*r[:,None]*t
cS = ne.evaluate('cos(S)')
sS = ne.evaluate('sin(S)')
return a[1:].dot(cS) - b[1:].dot(sS) + a[0]
另外,包括来自@Paul Panzer 的 post.
的函数PP
计时 -
In [22]: # Setup inputs
...: n = 10000
...: t = np.random.randint(0,9,(n))
...: a = np.random.randint(0,9,(n))
...: b = np.random.randint(0,9,(n))
...: T = 3.45
...:
In [23]: print np.allclose(original_app(t,a,b,T), vectorized_app(t,a,b,T))
...: print np.allclose(original_app(t,a,b,T), vectorized_app_v2(t,a,b,T))
...: print np.allclose(original_app(t,a,b,T), PP(t,a,b,T))
...:
True
True
True
In [25]: %timeit original_app(t,a,b,T)
...: %timeit vectorized_app(t,a,b,T)
...: %timeit vectorized_app_v2(t,a,b,T)
...: %timeit PP(t,a,b,T)
...:
1 loops, best of 3: 6.49 s per loop
1 loops, best of 3: 6.24 s per loop
1 loops, best of 3: 1.54 s per loop
1 loops, best of 3: 1.96 s per loop
无法击败 numexpr,但如果它不可用,我们可以节省先验数(测试和基准测试代码主要基于 @Divakar 的代码,以防您没有注意到 ;-)):
import numpy as np
from timeit import timeit
def PP(t,a,b,T):
CS = np.empty((len(t), len(a)-1), np.complex)
CS[...] = np.exp(2j*np.pi*(t[:, None])/T)
np.cumprod(CS, axis=-1, out=CS)
return a[1:].dot(CS.T.real) - b[1:].dot(CS.T.imag) + a[0]
def original_app(t,a,b,T):
yn=np.ones(len(t))*a[0]
for n in range(1,len(a)):
yn=yn+(a[n]*np.cos(2*np.pi*n*t/T)-b[n]*np.sin(2*np.pi*n*t/T))
return yn
def vectorized_app(t,a,b,T):
r = np.arange(1,len(a))
S = 2*np.pi*r[:,None]*t/T
cS = np.cos(S)
sS = np.sin(S)
return a[1:].dot(cS) - b[1:].dot(sS) + a[0]
n = 1000
t = 2000
t = np.random.randint(0,9,(t))
a = np.random.randint(0,9,(n))
b = np.random.randint(0,9,(n))
T = 3.45
print(np.allclose(original_app(t,a,b,T), vectorized_app(t,a,b,T)))
print(np.allclose(original_app(t,a,b,T), PP(t,a,b,T)))
print('{:18s} {:9.6f}'.format('orig', timeit(lambda: original_app(t,a,b,T), number=10)/10))
print('{:18s} {:9.6f}'.format('Divakar no numexpr', timeit(lambda: vectorized_app(t,a,b,T), number=10)/10))
print('{:18s} {:9.6f}'.format('PP', timeit(lambda: PP(t,a,b,T), number=10)/10))
打印:
True
True
orig 0.166903
Divakar no numexpr 0.179617
PP 0.060817
顺便说一句。如果 delta t
除 T
一个人可以节省更多,甚至 运行 完整的 fft 并丢弃太多的东西。
这不是真正的另一个答案,而是对@Paul Panzer 的评论,写成答案是因为我需要 post 一些代码。如果有办法在评论中post正确格式化代码,请指教。
受到@Paul Panzer cumprod
想法的启发,我想出了以下内容:
an = ones((len(a)-1,len(te)))*2j*pi*te/T
CS = exp(cumsum(an,axis=0))
out = (a[1:].dot(CS.real) - b[1:].dot(CS.imag)) + a[0]
虽然它似乎被适当地矢量化并产生了正确的结果,但它的性能很糟糕。它不仅比 cumprod
慢得多,预计会进行更多的 len(a)-1
指数运算,而且比原始的非矢量化版本慢 50%。性能不佳的原因是什么?