加速正态分布概率质量分配
Speeding up normal distribution probability mass allocation
我们有 N 位用户,平均 P 位用户。每个用户的点数,其中每个点是 0 到 1 之间的单个值。我们需要使用已知密度为 0.05 的正态分布来分布每个点的质量,因为这些点具有一些不确定性。此外,我们需要将质量包裹在 0 和 1 周围,例如0.95 处的点也将在 0 附近分配质量。我在下面提供了一个工作示例,它将正态分布分到 D=50 个分箱。该示例使用 Python 键入模块,但如果您愿意,可以忽略它。
from typing import List, Any
import numpy as np
import scipy.stats
import matplotlib.pyplot as plt
D = 50
BINS: List[float] = np.linspace(0, 1, D + 1).tolist()
def probability_mass(distribution: Any, x0: float, x1: float) -> float:
"""
Computes the area under the distribution, wrapping at 1.
The wrapping is done by adding the PDF at +- 1.
"""
assert x1 > x0
return (
(distribution.cdf(x1) - distribution.cdf(x0))
+ (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
+ (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
)
def point_density(x: float) -> List[float]:
distribution: Any = scipy.stats.norm(loc=x, scale=0.05)
density: List[float] = []
for i in range(D):
density.append(probability_mass(distribution, BINS[i], BINS[i + 1]))
return density
def user_density(points: List[float]) -> Any:
# Find the density of each point
density: Any = np.array([point_density(p) for p in points])
# Combine points and normalize
combined = density.sum(axis=0)
return combined / combined.sum()
if __name__ == "__main__":
# Example for one user
data: List[float] = [.05, .3, .5, .5]
density = user_density(data)
# Example for multiple users (N = 2)
print([user_density(x) for x in [[.3, .5], [.7, .7, .7, .9]]])
### NB: THE REMAINING CODE IS FOR ILLUSTRATION ONLY!
### NB: THE IMPORTANT THING IS TO COMPUTE THE DENSITY FAST!
middle: List[float] = []
for i in range(D):
middle.append((BINS[i] + BINS[i + 1]) / 2)
plt.bar(x=middle, height=density, width=1.0 / D + 0.001)
plt.xlim(0, 1)
plt.xlabel("x")
plt.ylabel("Density")
plt.show()
本例中N=1,D=50,P=4。但是,我们希望将这种方法扩展到 N=10000 和 P=100,同时尽可能快。我不清楚我们如何向量化这种方法。我们如何才能最好地加快速度?
编辑
更快的解决方案可能会产生略有不同的结果。例如,它可以近似正态分布而不是使用精确的正态分布。
EDIT2
我们只关心使用 user_density()
函数计算 density
。该图仅用于帮助解释该方法。我们不关心情节本身:)
EDIT3
注意 P 是平均值。每个用户的积分。有些用户可能拥有更多,有些可能拥有更少。如果有帮助,您可以假设我们可以丢弃点数,以便所有用户最多拥有 2 * P 点数。只要解决方案可以处理每个用户灵活的点数,就可以在进行基准测试时忽略这一部分。
这将是我的矢量化方法:
data = np.array([0.05, 0.3, 0.5, 0.5])
np.random.seed(31415)
# random noise
randoms = np.random.normal(0,1,(len(data), int(1e5))) * 0.05
# samples with noise
samples = data[:,None] + randoms
# wrap [0,1]
samples = (samples % 1).ravel()
# histogram
hist, bins, patches = plt.hist(samples, bins=BINS, density=True)
输出:
我能够将时间从每个 100 个数据点的样本约 4 秒减少到每个样本约 1 毫秒。
在我看来,您似乎花费了大量时间来模拟大量正态分布。由于无论如何您都在处理非常大的样本量,因此您也可以只使用标准正态分布值,因为无论如何它都会取平均值。
我重新创建了您的方法 (BaseMethod class),然后创建了一个优化的 class (OptimizedMethod class),并使用 timeit 装饰器对其进行了评估。我的方法的主要区别在于以下行:
# Generate a standardized set of values to add to each sample to simulate normal distribution
self.norm_vals = np.array([norm.ppf(x / norm_val_n) * 0.05 for x in range(1, norm_val_n, 1)])
这将基于逆正态累积分布函数创建一组通用数据点,我们可以将其添加到每个数据点以模拟围绕该点的正态分布。然后我们只是将数据重塑为用户样本和 运行 np.histogram 样本。
import numpy as np
import scipy.stats
from scipy.stats import norm
import time
# timeit decorator for evaluating performance
def timeit(method):
def timed(*args, **kw):
ts = time.time()
result = method(*args, **kw)
te = time.time()
print('%r %2.2f ms' % (method.__name__, (te - ts) * 1000 ))
return result
return timed
# Define Variables
N = 10000
D = 50
P = 100
# Generate sample data
np.random.seed(0)
data = np.random.rand(N, P)
# Run OP's method for comparison
class BaseMethod:
def __init__(self, d=50):
self.d = d
self.bins = np.linspace(0, 1, d + 1).tolist()
def probability_mass(self, distribution, x0, x1):
"""
Computes the area under the distribution, wrapping at 1.
The wrapping is done by adding the PDF at +- 1.
"""
assert x1 > x0
return (
(distribution.cdf(x1) - distribution.cdf(x0))
+ (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
+ (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
)
def point_density(self, x):
distribution = scipy.stats.norm(loc=x, scale=0.05)
density = []
for i in range(self.d):
density.append(self.probability_mass(distribution, self.bins[i], self.bins[i + 1]))
return density
@timeit
def base_user_density(self, data):
n = data.shape[0]
density = np.empty((n, self.d))
for i in range(data.shape[0]):
# Find the density of each point
row_density = np.array([self.point_density(p) for p in data[i]])
# Combine points and normalize
combined = row_density.sum(axis=0)
density[i, :] = combined / combined.sum()
return density
base = BaseMethod(d=D)
# Only running base method on first 2 rows of data because it's slow
density = base.base_user_density(data[:2])
print(density[:2, :5])
class OptimizedMethod:
def __init__(self, d=50, norm_val_n=50):
self.d = d
self.norm_val_n = norm_val_n
self.bins = np.linspace(0, 1, d + 1).tolist()
# Generate a standardized set of values to add to each sample to simulate normal distribution
self.norm_vals = np.array([norm.ppf(x / norm_val_n) * 0.05 for x in range(1, norm_val_n, 1)])
@timeit
def optimized_user_density(self, data):
samples = np.empty((data.shape[0], data.shape[1], self.norm_val_n - 1))
# transform datapoints to normal distributions around datapoint
for i in range(self.norm_vals.shape[0]):
samples[:, :, i] = data + self.norm_vals[i]
samples = samples.reshape(samples.shape[0], -1)
#wrap around [0, 1]
samples = samples % 1
#loop over samples for density
density = np.empty((data.shape[0], self.d))
for i in range(samples.shape[0]):
hist, bins = np.histogram(samples[i], bins=self.bins)
density[i, :] = hist / hist.sum()
return density
om = OptimizedMethod()
#Run optimized method on first 2 rows for apples to apples comparison
density = om.optimized_user_density(data[:2])
#Run optimized method on full data
density = om.optimized_user_density(data)
print(density[:2, :5])
运行 在我的系统上,原始方法在 2 行数据上 运行 花费了大约 8.4 秒,而优化方法在 2 行数据上花费了 1 毫秒 运行数据并在 4.7 秒内完成 10,000 行。我为每种方法打印了前 2 个样本的前 5 个值。
'base_user_density' 8415.03 ms
[[0.02176227 0.02278653 0.02422535 0.02597123 0.02745976]
[0.0175103 0.01638513 0.01524853 0.01432158 0.01391156]]
'optimized_user_density' 1.09 ms
'optimized_user_density' 4755.49 ms
[[0.02142857 0.02244898 0.02530612 0.02612245 0.0277551 ]
[0.01673469 0.01653061 0.01510204 0.01428571 0.01326531]]
对于最大的情况(N=10000,AVG[P]=100,D=50),您可以获得 低于 50 毫秒 通过使用 FFT 并创建 data
以 numpy 友好的格式。否则它将接近 300 毫秒。
我们的想法是将以 0 为中心的单个正态分布与一系列狄拉克增量进行卷积。
见下图:
使用循环卷积解决了两个问题。
- 自然处理边缘包裹
- 可以用FFT高效计算,Convolution Theorem
第一个必须创建要复制的分发。函数 mk_bell()
创建了以 0 为中心的 stddev 0.05 正态分布的直方图。
分布环绕 1。可以在这里使用 arbitrary 分布。计算分布的频谱用于快速卷积。
接下来创建一个类似梳状的函数。峰值位于对应于用户密度峰值的索引处。例如
peaks_location = [0.1, 0.3, 0.7]
D = 10
映射到
peak_index = (D * peak_location).astype(int) = [1, 3, 7]
dist = [0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0] # ones at [1, 3, 7]
您可以借助 np.bincount()
函数计算每个峰值位置的 bin 索引,从而快速创建 Diract Deltas 的组合。
为了进一步加快速度,可以并行计算用户峰值的梳状函数。
数组 dist
是形状 NxD
的二维数组。它可以线性化为形状 (N*D)
的一维数组。在此更改之后,位置 [user_id, peak_index]
上的元素将可以从索引 user_id*D + peak_index
访问。
使用 numpy 友好的输入格式(如下所述),此操作很容易矢量化。
卷积定理说两个信号的卷积的频谱等于每个信号的频谱的乘积。
频谱是使用 numpy.fft.rfft
计算的,它是专用于纯实信号(无虚部)的快速傅里叶变换的变体。
Numpy 允许使用一个命令计算较大矩阵的每一行的 FFT。
接下来通过简单的乘法计算卷积的频谱,利用广播
接下来,通过 numpy.fft.irfft
中实现的傅里叶逆变换将频谱计算回 "time" 域。
要充分利用 numpy 的速度,应避免使用可变大小的数据结构并保持固定大小的数组。我建议将输入数据表示为三个数组。
uids
用户标识符,整数0..N-1
peaks
,峰的位置
mass
,peek 的质量,目前是 1/numer-of-peaks-for-user
这种数据表示允许快速矢量化处理。
例如:
user_data = [[0.1, 0.3], [0.5]]
映射到:
uids = [0, 0, 1] # 2 points for user_data[0], one from user_data[1]
peaks = [0.1, 0.3, 0.5] # serialized user_data
mass = [0.5, 0.5, 1] # scaling factors for each peak, 0.5 means 2 peaks for user 0
代码:
import numpy as np
import matplotlib.pyplot as plt
import time
def mk_bell(D, SIGMA):
# computes normal distribution wrapped and centered at zero
x = np.linspace(0, 1, D, endpoint=False);
x = (x + 0.5) % 1 - 0.5
bell = np.exp(-0.5*np.square(x / SIGMA))
return bell / bell.sum()
def user_densities_by_fft(uids, peaks, mass, D, N=None):
bell = mk_bell(D, 0.05).astype('f4')
sbell = np.fft.rfft(bell)
if N is None:
N = uids.max() + 1
# ensure that peaks are in [0..1) internal
peaks = peaks - np.floor(peaks)
# convert peak location from 0-1 to the indices
pidx = (D * (peaks + uids)).astype('i4')
dist = np.bincount(pidx, mass, N * D).reshape(N, D)
# process all users at once with Convolution Theorem
sdist = np.fft.rfft(dist)
sdist *= sbell
res = np.fft.irfft(sdist)
return res
def generate_data(N, Pmean):
# generateor for large data
data = []
for n in range(N):
# select P uniformly from 1..2*Pmean
P = np.random.randint(2 * Pmean) + 1
# select peak locations
chunk = np.random.uniform(size=P)
data.append(chunk.tolist())
return data
def make_data_numpy_friendly(data):
uids = []
chunks = []
mass = []
for uid, peaks in enumerate(data):
uids.append(np.full(len(peaks), uid))
mass.append(np.full(len(peaks), 1 / len(peaks)))
chunks.append(peaks)
return np.hstack(uids), np.hstack(chunks), np.hstack(mass)
D = 50
# demo for simple multi-distribution
data, N = [[0, .5], [.7, .7, .7, .9], [0.05, 0.3, 0.5, 0.5]], None
uids, peaks, mass = make_data_numpy_friendly(data)
dist = user_densities_by_fft(uids, peaks, mass, D, N)
plt.plot(dist.T)
plt.show()
# the actual measurement
N = 10000
P = 100
data = generate_data(N, P)
tic = time.time()
uids, peaks, mass = make_data_numpy_friendly(data)
toc = time.time()
print(f"make_data_numpy_friendly: {toc - tic}")
tic = time.time()
dist = user_densities_by_fft(uids, peaks, mass, D, N)
toc = time.time()
print(f"user_densities_by_fft: {toc - tic}")
我的 4 核 Haswell 机器上的结果是:
make_data_numpy_friendly: 0.2733159065246582
user_densities_by_fft: 0.04064297676086426
处理数据用了40ms。请注意,将数据处理为 numpy 友好格式所花费的时间是实际计算分布所花费的时间的 6 倍。
Python 在循环方面真的很慢。
因此,我强烈建议首先以 numpy 友好的方式直接生成输入数据。
有一些问题需要解决:
- 精度,可以通过使用更大的
D
和下采样 来提高
- 可以通过加宽尖峰进一步提高峰定位的准确性。
- 性能,
scipy.fft
提供可能更快的 FFT 实现的移动变体
我们有 N 位用户,平均 P 位用户。每个用户的点数,其中每个点是 0 到 1 之间的单个值。我们需要使用已知密度为 0.05 的正态分布来分布每个点的质量,因为这些点具有一些不确定性。此外,我们需要将质量包裹在 0 和 1 周围,例如0.95 处的点也将在 0 附近分配质量。我在下面提供了一个工作示例,它将正态分布分到 D=50 个分箱。该示例使用 Python 键入模块,但如果您愿意,可以忽略它。
from typing import List, Any
import numpy as np
import scipy.stats
import matplotlib.pyplot as plt
D = 50
BINS: List[float] = np.linspace(0, 1, D + 1).tolist()
def probability_mass(distribution: Any, x0: float, x1: float) -> float:
"""
Computes the area under the distribution, wrapping at 1.
The wrapping is done by adding the PDF at +- 1.
"""
assert x1 > x0
return (
(distribution.cdf(x1) - distribution.cdf(x0))
+ (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
+ (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
)
def point_density(x: float) -> List[float]:
distribution: Any = scipy.stats.norm(loc=x, scale=0.05)
density: List[float] = []
for i in range(D):
density.append(probability_mass(distribution, BINS[i], BINS[i + 1]))
return density
def user_density(points: List[float]) -> Any:
# Find the density of each point
density: Any = np.array([point_density(p) for p in points])
# Combine points and normalize
combined = density.sum(axis=0)
return combined / combined.sum()
if __name__ == "__main__":
# Example for one user
data: List[float] = [.05, .3, .5, .5]
density = user_density(data)
# Example for multiple users (N = 2)
print([user_density(x) for x in [[.3, .5], [.7, .7, .7, .9]]])
### NB: THE REMAINING CODE IS FOR ILLUSTRATION ONLY!
### NB: THE IMPORTANT THING IS TO COMPUTE THE DENSITY FAST!
middle: List[float] = []
for i in range(D):
middle.append((BINS[i] + BINS[i + 1]) / 2)
plt.bar(x=middle, height=density, width=1.0 / D + 0.001)
plt.xlim(0, 1)
plt.xlabel("x")
plt.ylabel("Density")
plt.show()
本例中N=1,D=50,P=4。但是,我们希望将这种方法扩展到 N=10000 和 P=100,同时尽可能快。我不清楚我们如何向量化这种方法。我们如何才能最好地加快速度?
编辑
更快的解决方案可能会产生略有不同的结果。例如,它可以近似正态分布而不是使用精确的正态分布。
EDIT2
我们只关心使用 user_density()
函数计算 density
。该图仅用于帮助解释该方法。我们不关心情节本身:)
EDIT3
注意 P 是平均值。每个用户的积分。有些用户可能拥有更多,有些可能拥有更少。如果有帮助,您可以假设我们可以丢弃点数,以便所有用户最多拥有 2 * P 点数。只要解决方案可以处理每个用户灵活的点数,就可以在进行基准测试时忽略这一部分。
这将是我的矢量化方法:
data = np.array([0.05, 0.3, 0.5, 0.5])
np.random.seed(31415)
# random noise
randoms = np.random.normal(0,1,(len(data), int(1e5))) * 0.05
# samples with noise
samples = data[:,None] + randoms
# wrap [0,1]
samples = (samples % 1).ravel()
# histogram
hist, bins, patches = plt.hist(samples, bins=BINS, density=True)
输出:
我能够将时间从每个 100 个数据点的样本约 4 秒减少到每个样本约 1 毫秒。
在我看来,您似乎花费了大量时间来模拟大量正态分布。由于无论如何您都在处理非常大的样本量,因此您也可以只使用标准正态分布值,因为无论如何它都会取平均值。
我重新创建了您的方法 (BaseMethod class),然后创建了一个优化的 class (OptimizedMethod class),并使用 timeit 装饰器对其进行了评估。我的方法的主要区别在于以下行:
# Generate a standardized set of values to add to each sample to simulate normal distribution
self.norm_vals = np.array([norm.ppf(x / norm_val_n) * 0.05 for x in range(1, norm_val_n, 1)])
这将基于逆正态累积分布函数创建一组通用数据点,我们可以将其添加到每个数据点以模拟围绕该点的正态分布。然后我们只是将数据重塑为用户样本和 运行 np.histogram 样本。
import numpy as np
import scipy.stats
from scipy.stats import norm
import time
# timeit decorator for evaluating performance
def timeit(method):
def timed(*args, **kw):
ts = time.time()
result = method(*args, **kw)
te = time.time()
print('%r %2.2f ms' % (method.__name__, (te - ts) * 1000 ))
return result
return timed
# Define Variables
N = 10000
D = 50
P = 100
# Generate sample data
np.random.seed(0)
data = np.random.rand(N, P)
# Run OP's method for comparison
class BaseMethod:
def __init__(self, d=50):
self.d = d
self.bins = np.linspace(0, 1, d + 1).tolist()
def probability_mass(self, distribution, x0, x1):
"""
Computes the area under the distribution, wrapping at 1.
The wrapping is done by adding the PDF at +- 1.
"""
assert x1 > x0
return (
(distribution.cdf(x1) - distribution.cdf(x0))
+ (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
+ (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
)
def point_density(self, x):
distribution = scipy.stats.norm(loc=x, scale=0.05)
density = []
for i in range(self.d):
density.append(self.probability_mass(distribution, self.bins[i], self.bins[i + 1]))
return density
@timeit
def base_user_density(self, data):
n = data.shape[0]
density = np.empty((n, self.d))
for i in range(data.shape[0]):
# Find the density of each point
row_density = np.array([self.point_density(p) for p in data[i]])
# Combine points and normalize
combined = row_density.sum(axis=0)
density[i, :] = combined / combined.sum()
return density
base = BaseMethod(d=D)
# Only running base method on first 2 rows of data because it's slow
density = base.base_user_density(data[:2])
print(density[:2, :5])
class OptimizedMethod:
def __init__(self, d=50, norm_val_n=50):
self.d = d
self.norm_val_n = norm_val_n
self.bins = np.linspace(0, 1, d + 1).tolist()
# Generate a standardized set of values to add to each sample to simulate normal distribution
self.norm_vals = np.array([norm.ppf(x / norm_val_n) * 0.05 for x in range(1, norm_val_n, 1)])
@timeit
def optimized_user_density(self, data):
samples = np.empty((data.shape[0], data.shape[1], self.norm_val_n - 1))
# transform datapoints to normal distributions around datapoint
for i in range(self.norm_vals.shape[0]):
samples[:, :, i] = data + self.norm_vals[i]
samples = samples.reshape(samples.shape[0], -1)
#wrap around [0, 1]
samples = samples % 1
#loop over samples for density
density = np.empty((data.shape[0], self.d))
for i in range(samples.shape[0]):
hist, bins = np.histogram(samples[i], bins=self.bins)
density[i, :] = hist / hist.sum()
return density
om = OptimizedMethod()
#Run optimized method on first 2 rows for apples to apples comparison
density = om.optimized_user_density(data[:2])
#Run optimized method on full data
density = om.optimized_user_density(data)
print(density[:2, :5])
运行 在我的系统上,原始方法在 2 行数据上 运行 花费了大约 8.4 秒,而优化方法在 2 行数据上花费了 1 毫秒 运行数据并在 4.7 秒内完成 10,000 行。我为每种方法打印了前 2 个样本的前 5 个值。
'base_user_density' 8415.03 ms
[[0.02176227 0.02278653 0.02422535 0.02597123 0.02745976]
[0.0175103 0.01638513 0.01524853 0.01432158 0.01391156]]
'optimized_user_density' 1.09 ms
'optimized_user_density' 4755.49 ms
[[0.02142857 0.02244898 0.02530612 0.02612245 0.0277551 ]
[0.01673469 0.01653061 0.01510204 0.01428571 0.01326531]]
对于最大的情况(N=10000,AVG[P]=100,D=50),您可以获得 低于 50 毫秒 通过使用 FFT 并创建 data
以 numpy 友好的格式。否则它将接近 300 毫秒。
我们的想法是将以 0 为中心的单个正态分布与一系列狄拉克增量进行卷积。
见下图:
使用循环卷积解决了两个问题。
- 自然处理边缘包裹
- 可以用FFT高效计算,Convolution Theorem
第一个必须创建要复制的分发。函数 mk_bell()
创建了以 0 为中心的 stddev 0.05 正态分布的直方图。
分布环绕 1。可以在这里使用 arbitrary 分布。计算分布的频谱用于快速卷积。
接下来创建一个类似梳状的函数。峰值位于对应于用户密度峰值的索引处。例如
peaks_location = [0.1, 0.3, 0.7]
D = 10
映射到
peak_index = (D * peak_location).astype(int) = [1, 3, 7]
dist = [0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0] # ones at [1, 3, 7]
您可以借助 np.bincount()
函数计算每个峰值位置的 bin 索引,从而快速创建 Diract Deltas 的组合。
为了进一步加快速度,可以并行计算用户峰值的梳状函数。
数组 dist
是形状 NxD
的二维数组。它可以线性化为形状 (N*D)
的一维数组。在此更改之后,位置 [user_id, peak_index]
上的元素将可以从索引 user_id*D + peak_index
访问。
使用 numpy 友好的输入格式(如下所述),此操作很容易矢量化。
卷积定理说两个信号的卷积的频谱等于每个信号的频谱的乘积。
频谱是使用 numpy.fft.rfft
计算的,它是专用于纯实信号(无虚部)的快速傅里叶变换的变体。
Numpy 允许使用一个命令计算较大矩阵的每一行的 FFT。
接下来通过简单的乘法计算卷积的频谱,利用广播
接下来,通过 numpy.fft.irfft
中实现的傅里叶逆变换将频谱计算回 "time" 域。
要充分利用 numpy 的速度,应避免使用可变大小的数据结构并保持固定大小的数组。我建议将输入数据表示为三个数组。
uids
用户标识符,整数0..N-1peaks
,峰的位置mass
,peek 的质量,目前是 1/numer-of-peaks-for-user
这种数据表示允许快速矢量化处理。 例如:
user_data = [[0.1, 0.3], [0.5]]
映射到:
uids = [0, 0, 1] # 2 points for user_data[0], one from user_data[1]
peaks = [0.1, 0.3, 0.5] # serialized user_data
mass = [0.5, 0.5, 1] # scaling factors for each peak, 0.5 means 2 peaks for user 0
代码:
import numpy as np
import matplotlib.pyplot as plt
import time
def mk_bell(D, SIGMA):
# computes normal distribution wrapped and centered at zero
x = np.linspace(0, 1, D, endpoint=False);
x = (x + 0.5) % 1 - 0.5
bell = np.exp(-0.5*np.square(x / SIGMA))
return bell / bell.sum()
def user_densities_by_fft(uids, peaks, mass, D, N=None):
bell = mk_bell(D, 0.05).astype('f4')
sbell = np.fft.rfft(bell)
if N is None:
N = uids.max() + 1
# ensure that peaks are in [0..1) internal
peaks = peaks - np.floor(peaks)
# convert peak location from 0-1 to the indices
pidx = (D * (peaks + uids)).astype('i4')
dist = np.bincount(pidx, mass, N * D).reshape(N, D)
# process all users at once with Convolution Theorem
sdist = np.fft.rfft(dist)
sdist *= sbell
res = np.fft.irfft(sdist)
return res
def generate_data(N, Pmean):
# generateor for large data
data = []
for n in range(N):
# select P uniformly from 1..2*Pmean
P = np.random.randint(2 * Pmean) + 1
# select peak locations
chunk = np.random.uniform(size=P)
data.append(chunk.tolist())
return data
def make_data_numpy_friendly(data):
uids = []
chunks = []
mass = []
for uid, peaks in enumerate(data):
uids.append(np.full(len(peaks), uid))
mass.append(np.full(len(peaks), 1 / len(peaks)))
chunks.append(peaks)
return np.hstack(uids), np.hstack(chunks), np.hstack(mass)
D = 50
# demo for simple multi-distribution
data, N = [[0, .5], [.7, .7, .7, .9], [0.05, 0.3, 0.5, 0.5]], None
uids, peaks, mass = make_data_numpy_friendly(data)
dist = user_densities_by_fft(uids, peaks, mass, D, N)
plt.plot(dist.T)
plt.show()
# the actual measurement
N = 10000
P = 100
data = generate_data(N, P)
tic = time.time()
uids, peaks, mass = make_data_numpy_friendly(data)
toc = time.time()
print(f"make_data_numpy_friendly: {toc - tic}")
tic = time.time()
dist = user_densities_by_fft(uids, peaks, mass, D, N)
toc = time.time()
print(f"user_densities_by_fft: {toc - tic}")
我的 4 核 Haswell 机器上的结果是:
make_data_numpy_friendly: 0.2733159065246582
user_densities_by_fft: 0.04064297676086426
处理数据用了40ms。请注意,将数据处理为 numpy 友好格式所花费的时间是实际计算分布所花费的时间的 6 倍。 Python 在循环方面真的很慢。 因此,我强烈建议首先以 numpy 友好的方式直接生成输入数据。
有一些问题需要解决:
- 精度,可以通过使用更大的
D
和下采样 来提高
- 可以通过加宽尖峰进一步提高峰定位的准确性。
- 性能,
scipy.fft
提供可能更快的 FFT 实现的移动变体