加速正态分布概率质量分配

Speeding up normal distribution probability mass allocation

我们有 N 位用户,平均 P 位用户。每个用户的点数,其中每个点是 0 到 1 之间的单个值。我们需要使用已知密度为 0.05 的正态分布来分布每个点的质量,因为这些点具有一些不确定性。此外,我们需要将质量包裹在 0 和 1 周围,例如0.95 处的点也将在 0 附近分配质量。我在下面提供了一个工作示例,它将正态分布分到 D=50 个分箱。该示例使用 Python 键入模块,但如果您愿意,可以忽略它。

from typing import List, Any
import numpy as np
import scipy.stats
import matplotlib.pyplot as plt

D = 50
BINS: List[float] = np.linspace(0, 1, D + 1).tolist()


def probability_mass(distribution: Any, x0: float, x1: float) -> float:
    """
    Computes the area under the distribution, wrapping at 1.
    The wrapping is done by adding the PDF at +- 1.
    """
    assert x1 > x0
    return (
        (distribution.cdf(x1) - distribution.cdf(x0))
        + (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
        + (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
    )


def point_density(x: float) -> List[float]:
    distribution: Any = scipy.stats.norm(loc=x, scale=0.05)
    density: List[float] = []
    for i in range(D):
        density.append(probability_mass(distribution, BINS[i], BINS[i + 1]))
    return density


def user_density(points: List[float]) -> Any:

    # Find the density of each point
    density: Any = np.array([point_density(p) for p in points])

    # Combine points and normalize
    combined = density.sum(axis=0)
    return combined / combined.sum()


if __name__ == "__main__":

    # Example for one user
    data: List[float] = [.05, .3, .5, .5]
    density = user_density(data)

    # Example for multiple users (N = 2)
    print([user_density(x) for x in [[.3, .5], [.7, .7, .7, .9]]])

    ### NB: THE REMAINING CODE IS FOR ILLUSTRATION ONLY!
    ### NB: THE IMPORTANT THING IS TO COMPUTE THE DENSITY FAST!
    middle: List[float] = []
    for i in range(D):
        middle.append((BINS[i] + BINS[i + 1]) / 2)
    plt.bar(x=middle, height=density, width=1.0 / D + 0.001)
    plt.xlim(0, 1)
    plt.xlabel("x")
    plt.ylabel("Density")
    plt.show()

本例中N=1D=50P=4。但是,我们希望将这种方法扩展到 N=10000P=100,同时尽可能快。我不清楚我们如何向量化这种方法。我们如何才能最好地加快速度?

编辑

更快的解决方案可能会产生略有不同的结果。例如,它可以近似正态分布而不是使用精确的正态分布。

EDIT2

我们只关心使用 user_density() 函数计算 density。该图仅用于帮助解释该方法。我们不关心情节本身:)

EDIT3

注意 P 是平均值。每个用户的积分。有些用户可能拥有更多,有些可能拥有更少。如果有帮助,您可以假设我们可以丢弃点数,以便所有用户最多拥有 2 * P 点数。只要解决方案可以处理每个用户灵活的点数,就可以在进行基准测试时忽略这一部分。

这将是我的矢量化方法:

data = np.array([0.05, 0.3, 0.5, 0.5])

np.random.seed(31415)
# random noise
randoms = np.random.normal(0,1,(len(data), int(1e5))) * 0.05

# samples with noise
samples = data[:,None] + randoms

# wrap [0,1]
samples = (samples % 1).ravel()


# histogram
hist, bins, patches = plt.hist(samples, bins=BINS, density=True)

输出:

我能够将时间从每个 100 个数据点的样本约 4 秒减少到每个样本约 1 毫秒。

在我看来,您似乎花费了大量时间来模拟大量正态分布。由于无论如何您都在处理非常大的样本量,因此您也可以只使用标准正态分布值,因为无论如何它都会取平均值。

我重新创建了您的方法 (BaseMethod class),然后创建了一个优化的 class (OptimizedMethod class),并使用 timeit 装饰器对其进行了评估。我的方法的主要区别在于以下行:

    # Generate a standardized set of values to add to each sample to simulate normal distribution
    self.norm_vals = np.array([norm.ppf(x / norm_val_n) * 0.05 for x in range(1, norm_val_n, 1)])

这将基于逆正态累积分布函数创建一组通用数据点,我们可以将其添加到每个数据点以模拟围绕该点的正态分布。然后我们只是将数据重塑为用户样本和 运行 np.histogram 样本。

import numpy as np
import scipy.stats
from scipy.stats import norm
import time

# timeit decorator for evaluating performance
def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        print('%r  %2.2f ms' % (method.__name__, (te - ts) * 1000 ))
        return result
    return timed

# Define Variables
N = 10000
D = 50
P = 100

# Generate sample data
np.random.seed(0)
data = np.random.rand(N, P)

# Run OP's method for comparison
class BaseMethod:
    def __init__(self, d=50):
        self.d = d
        self.bins = np.linspace(0, 1, d + 1).tolist()

    def probability_mass(self, distribution, x0, x1):
        """
        Computes the area under the distribution, wrapping at 1.
        The wrapping is done by adding the PDF at +- 1.
        """
        assert x1 > x0
        return (
            (distribution.cdf(x1) - distribution.cdf(x0))
            + (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
            + (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
        )

    def point_density(self, x):
        distribution = scipy.stats.norm(loc=x, scale=0.05)
        density = []
        for i in range(self.d):
            density.append(self.probability_mass(distribution, self.bins[i], self.bins[i + 1]))
        return density

    @timeit
    def base_user_density(self, data):
        n = data.shape[0]
        density = np.empty((n, self.d))
        for i in range(data.shape[0]):
            # Find the density of each point
            row_density = np.array([self.point_density(p) for p in data[i]])
            # Combine points and normalize
            combined = row_density.sum(axis=0)
            density[i, :] = combined / combined.sum()
        return density


base = BaseMethod(d=D)
# Only running base method on first 2 rows of data because it's slow
density = base.base_user_density(data[:2])
print(density[:2, :5])


class OptimizedMethod:

    def __init__(self, d=50, norm_val_n=50):
        self.d = d
        self.norm_val_n = norm_val_n
        self.bins = np.linspace(0, 1, d + 1).tolist()

        # Generate a standardized set of values to add to each sample to simulate normal distribution
        self.norm_vals = np.array([norm.ppf(x / norm_val_n) * 0.05 for x in range(1, norm_val_n, 1)])

    @timeit
    def optimized_user_density(self, data):

        samples = np.empty((data.shape[0], data.shape[1], self.norm_val_n - 1))
        # transform datapoints to normal distributions around datapoint
        for i in range(self.norm_vals.shape[0]):
            samples[:, :, i] = data + self.norm_vals[i]
        samples = samples.reshape(samples.shape[0], -1)

        #wrap around [0, 1]
        samples = samples % 1

        #loop over samples for density
        density = np.empty((data.shape[0], self.d))
        for i in range(samples.shape[0]):
            hist, bins = np.histogram(samples[i], bins=self.bins)
            density[i, :] = hist / hist.sum()

        return density


om = OptimizedMethod()
#Run optimized method on first 2 rows for apples to apples comparison
density = om.optimized_user_density(data[:2])
#Run optimized method on full data
density = om.optimized_user_density(data)
print(density[:2, :5])

运行 在我的系统上,原始方法在 2 行数据上 运行 花费了大约 8.4 秒,而优化方法在 2 行数据上花费了 1 毫秒 运行数据并在 4.7 秒内完成 10,000 行。我为每种方法打印了前 2 个样本的前 5 个值。

'base_user_density'  8415.03 ms
[[0.02176227 0.02278653 0.02422535 0.02597123 0.02745976]
 [0.0175103  0.01638513 0.01524853 0.01432158 0.01391156]]
'optimized_user_density'  1.09 ms
'optimized_user_density'  4755.49 ms
[[0.02142857 0.02244898 0.02530612 0.02612245 0.0277551 ]
 [0.01673469 0.01653061 0.01510204 0.01428571 0.01326531]]

对于最大的情况(N=10000,AVG[P]=100,D=50),您可以获得 低于 50 毫秒 通过使用 FFT 并创建 data以 numpy 友好的格式。否则它将接近 300 毫秒。

我们的想法是将以 0 为中心的单个正态分布与一系​​列狄拉克增量进行卷积。

见下图:

使用循环卷积解决了两个问题。

第一个必须创建要复制的分发。函数 mk_bell() 创建了以 0 为中心的 stddev 0.05 正态分布的直方图。 分布环绕 1。可以在这里使用 arbitrary 分布。计算分布的频谱用于快速卷积。

接下来创建一个类似梳状的函数。峰值位于对应于用户密度峰值的索引处。例如

peaks_location = [0.1, 0.3, 0.7]
D = 10

映射到

peak_index = (D * peak_location).astype(int) = [1, 3, 7]
dist = [0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0] # ones at [1, 3, 7]

您可以借助 np.bincount() 函数计算每个峰值位置的 bin 索引,从而快速创建 Diract Deltas 的组合。 为了进一步加快速度,可以并行计算用户峰值的梳状函数。

数组 dist 是形状 NxD 的二维数组。它可以线性化为形状 (N*D) 的一维数组。在此更改之后,位置 [user_id, peak_index] 上的元素将可以从索引 user_id*D + peak_index 访问。 使用 numpy 友好的输入格式(如下所述),此操作很容易矢量化。

卷积定理说两个信号的卷积的频谱等于每个信号的频谱的乘积。

频谱是使用 numpy.fft.rfft 计算的,它是专用于纯实信号(无虚部)的快速傅里叶变换的变体。

Numpy 允许使用一个命令计算较大矩阵的每一行的 FFT。

接下来通过简单的乘法计算卷积的频谱,利用广播

接下来,通过 numpy.fft.irfft 中实现的傅里叶逆变换将频谱计算回 "time" 域。

要充分利用 numpy 的速度,应避免使用可变大小的数据结构并保持固定大小的数组。我建议将输入数据表示为三个数组。

  • uids用户标识符,整数0..N-1
  • peaks,峰的位置
  • mass,peek 的质量,目前是 1/numer-of-peaks-for-user

这种数据表示允许快速矢量化处理。 例如:

user_data = [[0.1, 0.3], [0.5]]

映射到:

uids = [0, 0, 1] # 2 points for user_data[0], one from user_data[1]
peaks = [0.1, 0.3, 0.5] # serialized user_data
mass = [0.5, 0.5, 1] # scaling factors for each peak, 0.5 means 2 peaks for user 0

代码:

import numpy as np
import matplotlib.pyplot as plt
import time

def mk_bell(D, SIGMA):
    # computes normal distribution wrapped and centered at zero
    x = np.linspace(0, 1, D, endpoint=False);
    x = (x + 0.5) % 1 - 0.5
    bell = np.exp(-0.5*np.square(x / SIGMA))
    return bell / bell.sum()

def user_densities_by_fft(uids, peaks, mass, D, N=None):
    bell = mk_bell(D, 0.05).astype('f4')
    sbell = np.fft.rfft(bell)
    if N is None:
        N = uids.max() + 1
    # ensure that peaks are in [0..1) internal
    peaks = peaks - np.floor(peaks)
    # convert peak location from 0-1 to the indices
    pidx = (D * (peaks + uids)).astype('i4')
    dist = np.bincount(pidx, mass, N * D).reshape(N, D)
    # process all users at once with Convolution Theorem
    sdist = np.fft.rfft(dist)
    sdist *= sbell
    res = np.fft.irfft(sdist)

    return res

def generate_data(N, Pmean):
    # generateor for large data
    data = []
    for n in range(N):
        # select P uniformly from 1..2*Pmean
        P = np.random.randint(2 * Pmean) + 1
        # select peak locations
        chunk = np.random.uniform(size=P)
        data.append(chunk.tolist())
    return data

def make_data_numpy_friendly(data):
    uids = []
    chunks = []
    mass = []
    for uid, peaks in enumerate(data):
        uids.append(np.full(len(peaks), uid))
        mass.append(np.full(len(peaks), 1 / len(peaks)))
        chunks.append(peaks)
    return np.hstack(uids), np.hstack(chunks), np.hstack(mass)




D = 50

# demo for simple multi-distribution
data, N = [[0, .5], [.7, .7, .7, .9], [0.05, 0.3, 0.5, 0.5]], None
uids, peaks, mass = make_data_numpy_friendly(data)
dist = user_densities_by_fft(uids, peaks, mass, D, N)
plt.plot(dist.T)
plt.show()

# the actual measurement
N = 10000
P = 100
data = generate_data(N, P)

tic = time.time()
uids, peaks, mass = make_data_numpy_friendly(data)
toc = time.time()
print(f"make_data_numpy_friendly: {toc - tic}")

tic = time.time()
dist = user_densities_by_fft(uids, peaks, mass, D, N)
toc = time.time()
print(f"user_densities_by_fft: {toc - tic}")

我的 4 核 Haswell 机器上的结果是:

make_data_numpy_friendly: 0.2733159065246582
user_densities_by_fft: 0.04064297676086426

处理数据用了40ms。请注意,将数据处理为 numpy 友好格式所花费的时间是实际计算分布所花费的时间的 6 倍。 Python 在循环方面真的很慢。 因此,我强烈建议首先以 numpy 友好的方式直接生成输入数据。

有一些问题需要解决:

  • 精度,可以通过使用更大的D和下采样
  • 来提高
  • 可以通过加宽尖峰进一步提高峰定位的准确性。
  • 性能,scipy.fft 提供可能更快的 FFT 实现的移动变体