numpy:大量线段/点的快速规则间隔平均值
numpy: fast regularly-spaced average for large numbers of line segments / points
我有许多(约 100 万个)不规则间隔的点 P,沿着一维线。这些标记线段,这样如果点是 {0, x_a, x_b, x_c, x_d, ...},线段从 0 ->x_a、x_a->x_b、x_b->x_c、x_c->x_d等。每个部分也有一个 y 值,我希望将其解释为颜色深度。我需要将这条线绘制成图像,但可能只有(比方说)1000 个像素可用于表示该线的整个长度。当然,这些像素对应于沿线的规则间隔,例如 0..X1、X1..X2、X2..X3 等,其中 X1、X2、X3 是规则间隔的。为了计算出每个像素的颜色,我需要对落在规则间隔像素边界内的所有 y 值取平均值,并根据落在该区间内的线段长度进行加权。也可能存在不包含 P 中任何值的像素,它们只是采用由穿过整个像素的段定义的颜色值。
这似乎是图像分析中可能需要做大量工作的事情。那么这个操作是否有一个名称,在 numpy 中计算这样一组规则间隔的平均 y 值的最快方法是什么?这有点像插值,我猜,只是我不想只取周围两个点的平均值,而是一个规则间隔内所有点的加权平均值(加上一点重叠)。
[编辑 - 添加了最小示例]
假设一条水平线上有 5 个线段,由 [0, 1.1, 2.2, 2.3, 2.8, 4] 分隔(即线从 0 到 4)。假设每个段都采用任意阴影值,例如,我们可以有 5 个阴影值 [0,0.88,0.55,0.11,0.44] - 其中 0 是黑色,1 是白色。然后,如果我想使用 4 个像素绘制它,我需要创建 4 个值,从 0...1、1...2 等,并且期望计算 return 每个值的以下值:
0...1 = 0(这被第一条线段覆盖,0->1.1)
1...2 = 0.1 * 0 + 0.9 * 0.88(1 ... 1.1 被第一条线段覆盖,其余部分被第二条线段覆盖)
2...3 = 0.2 * 0.88, 0.1 * 0.55 + 0.5 * 0.11 + 0.2 * 0.44(这个被第二到第五个线段覆盖了)
3...4 = 0.44(这个被最后一条线段覆盖了,2.8->4)
而如果我想将此数据放入一条 2 像素长的线中,则这 2 个像素将具有以下值:
0...2 = 1.1 / 2 * 0 + 0.9 / 2 * 0.88
2...4 = 0.2 / 2 * 0.88 + 0.1 / 2 * 0.55 + 0.5 / 2 * 0.11 + 1.2 * 0.44
这似乎是 "right" 沿 1d 线进行下采样的方法。我正在寻找一个快速实现(最好是内置的东西),当我(比如说)沿着这条线有一百万个点,并且只有 1000 个(左右)像素来容纳它们时。
鉴于您的关键要求是不需要线性插值,您应该看看使用 scipy.signal.resample。
这会将您的信号转换为频谱,然后转换为沿 x 轴规则间隔的新时间序列。
另请参阅此问题:。
您仍然可以使用线性插值来完成此操作。尽管您的函数是分段常数,但您希望它在很多小间隔内的平均值。某些函数 f(x) 在 a 到 区间内的平均值b 只是它在该范围内的积分除以 a 和 b 之间的差值。分段常数函数的积分将是分段线性函数。所以,假设你有你的数据:
x = [0, 1.1, 2.2, 2.3, 2.8, 4]
y = [0, 0.88, 0.55, 0.11, 0.44]
创建一个函数,它将在 x 的任意值处给出其积分。这里数组 I
将包含每个给定值 x 的积分值,函数 f
是它的线性插值,它将给出任意点的精确值:
I = numpy.zeros_like(x)
I[1:] = numpy.cumsum(numpy.diff(x) * y)
f = scipy.interpolate.interp1d(x, I)
现在评估每个像素的平均值很容易:
pix_x = numpy.linspace(0, 4, 5)
pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])
我们可以检查这些数组中的内容:
>>> pix_x
array([0., 1., 2., 3., 4.])
>>> pix_y
array([0. , 0.792, 0.374, 0.44 ])
像素的阴影值现在在 pix_y
中。这些应该与您在上面的示例中给出的值完全匹配。
即使对于很多很多点,这也应该相当快:
def test(x, y):
I = numpy.zeros_like(x)
I[1:] = numpy.cumsum(numpy.diff(x) * y)
f = scipy.interpolate.interp1d(x, I,
bounds_error=False, fill_value=(0, I[-1]))
pix_x = numpy.linspace(0, 1, 1001)
pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])
return pix_y
timeit
报告:
225 ms ± 37.6 ms per loop
在我的系统上,当 x 的大小为 1000000(而 y 的大小为 999999)时。请注意,bounds_error=False
和 fill_value=(0, I[-1])
会传递给 interp1d
。这具有假设您的着色函数在 x 值范围之外为零的效果。此外,interp1d
不需要对输入值进行排序;在上面的测试中,我将 x 和 y 作为 0 到 1 之间的均匀随机数数组。但是,如果您确定它们已排序,您可以传递 assume_sorted=True
并且您应该获得速度提升:
20.2 ms ± 377 µs per loop
如您所料,这个问题有一个纯粹的 numpy 解决方案。诀窍是巧妙地混合 np.searchsorted
, which will place your regular grid on the nearest bin of the original, and np.add.reduceat
来计算 bins 的总和:
import numpy as np
def distribute(x, y, n):
"""
Down-samples/interpolates the y-values of each segment across a
domain with `n` points. `x` represents segment endpoints, so should
have one more element than `y`.
"""
y = np.asanyarray(y)
x = np.asanyarray(x)
new_x = np.linspace(x[0], x[-1], n + 1)
# Find the insertion indices
locs = np.searchsorted(x, new_x)[1:]
# create a matrix of indices
indices = np.zeros(2 * n, dtype=np.int)
# Fill it in
dloc = locs[:-1] - 1
indices[2::2] = dloc
indices[1::2] = locs
# This is the sum of every original segment a new segment touches
weighted = np.append(y * np.diff(x), 0)
sums = np.add.reduceat(weighted, indices)[::2]
# Now subtract the adjusted portions from the right end of the sums
sums[:-1] -= (x[dloc + 1] - new_x[1:-1]) * y[dloc]
# Now do the same for the left of each interval
sums[1:] -= (new_x[1:-1] - x[dloc]) * y[dloc]
return new_x, sums / np.diff(new_x)
seg = [0, 1.1, 2.2, 2.3, 2.8, 4]
color = [0, 0.88, 0.55, 0.11, 0.44]
seg, color = distribute(seg, color, 4)
print(seg, color)
结果是
[0. 1. 2. 3. 4.] [0. 0.792 0.374 0.44 ]
这正是您在手动计算中所期望的。
基准测试
我 运行 以下一组基准,以确保 和我的都同意答案,并检查时间安排。我稍微修改了另一个解决方案,使其具有与我相同的界面:
from scipy.interpolate import interp1d
def EE_(x, y, n):
I = np.zeros_like(x)
I[1:] = np.cumsum(np.diff(x) * y)
f = interp1d(x, I, bounds_error=False, fill_value=(0, I[-1]))
pix_x = np.linspace(x[0], x[-1], n + 1)
pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])
return pix_x, pix_y
这里是测试台(方法 MadPhysicist
只是从上面的 distribute
函数重命名)。输入始终是 x
的 1001 个元素和 y
的 1000 个元素。输出数字为 5, 10, 100, 1000, 10000:
np.random.seed(0x1234ABCD)
x = np.cumsum(np.random.gamma(3.0, 0.2, size=1001))
y = np.random.uniform(0.0, 1.0, size=1000)
tests = (
MadPhysicist,
EE_,
)
for n in (5, 10, 100, 1000, 10000):
print(f'N = {n}')
results = {test.__name__: test(x, y, n) for test in tests}
for name, (x_out, y_out) in results.items():
print(f'{name}:\n\tx = {x_out}\n\ty = {y_out}')
allsame = np.array([[np.allclose(x1, x2) and np.allclose(y1, y2)
for x2, y2 in results.values()]
for x1, y1 in results.values()])
print()
print(f'Result Match:\n{allsame}')
from IPython import get_ipython
magic = get_ipython().magic
for test in tests:
print(f'{test.__name__}({n}):\n\t', end='')
magic(f'timeit {test.__name__}(x, y, n)')
我将跳过数据和协议打印输出(结果相同),并显示时间:
N = 5
MadPhysicist: 50.6 µs ± 349 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 110 µs ± 568 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 10
MadPhysicist: 50.5 µs ± 732 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 111 µs ± 635 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 100
MadPhysicist: 54.5 µs ± 284 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 114 µs ± 215 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 1000
MadPhysicist: 107 µs ± 5.73 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 148 µs ± 5.11 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 10000
MadPhysicist: 458 µs ± 2.21 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
EE_: 301 µs ± 4.57 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
您可以看到,在较小的输出大小下,numpy 解决方案要快得多,这可能是因为开销占主导地位。然而,在更多的断点处,scipy 解决方案变得更快。您必须比较不同的输入大小才能真正了解时序如何计算,而不仅仅是不同的输出大小。
我有许多(约 100 万个)不规则间隔的点 P,沿着一维线。这些标记线段,这样如果点是 {0, x_a, x_b, x_c, x_d, ...},线段从 0 ->x_a、x_a->x_b、x_b->x_c、x_c->x_d等。每个部分也有一个 y 值,我希望将其解释为颜色深度。我需要将这条线绘制成图像,但可能只有(比方说)1000 个像素可用于表示该线的整个长度。当然,这些像素对应于沿线的规则间隔,例如 0..X1、X1..X2、X2..X3 等,其中 X1、X2、X3 是规则间隔的。为了计算出每个像素的颜色,我需要对落在规则间隔像素边界内的所有 y 值取平均值,并根据落在该区间内的线段长度进行加权。也可能存在不包含 P 中任何值的像素,它们只是采用由穿过整个像素的段定义的颜色值。
这似乎是图像分析中可能需要做大量工作的事情。那么这个操作是否有一个名称,在 numpy 中计算这样一组规则间隔的平均 y 值的最快方法是什么?这有点像插值,我猜,只是我不想只取周围两个点的平均值,而是一个规则间隔内所有点的加权平均值(加上一点重叠)。
[编辑 - 添加了最小示例]
假设一条水平线上有 5 个线段,由 [0, 1.1, 2.2, 2.3, 2.8, 4] 分隔(即线从 0 到 4)。假设每个段都采用任意阴影值,例如,我们可以有 5 个阴影值 [0,0.88,0.55,0.11,0.44] - 其中 0 是黑色,1 是白色。然后,如果我想使用 4 个像素绘制它,我需要创建 4 个值,从 0...1、1...2 等,并且期望计算 return 每个值的以下值:
0...1 = 0(这被第一条线段覆盖,0->1.1)
1...2 = 0.1 * 0 + 0.9 * 0.88(1 ... 1.1 被第一条线段覆盖,其余部分被第二条线段覆盖)
2...3 = 0.2 * 0.88, 0.1 * 0.55 + 0.5 * 0.11 + 0.2 * 0.44(这个被第二到第五个线段覆盖了)
3...4 = 0.44(这个被最后一条线段覆盖了,2.8->4)
而如果我想将此数据放入一条 2 像素长的线中,则这 2 个像素将具有以下值:
0...2 = 1.1 / 2 * 0 + 0.9 / 2 * 0.88
2...4 = 0.2 / 2 * 0.88 + 0.1 / 2 * 0.55 + 0.5 / 2 * 0.11 + 1.2 * 0.44
这似乎是 "right" 沿 1d 线进行下采样的方法。我正在寻找一个快速实现(最好是内置的东西),当我(比如说)沿着这条线有一百万个点,并且只有 1000 个(左右)像素来容纳它们时。
鉴于您的关键要求是不需要线性插值,您应该看看使用 scipy.signal.resample。
这会将您的信号转换为频谱,然后转换为沿 x 轴规则间隔的新时间序列。
另请参阅此问题:
您仍然可以使用线性插值来完成此操作。尽管您的函数是分段常数,但您希望它在很多小间隔内的平均值。某些函数 f(x) 在 a 到 区间内的平均值b 只是它在该范围内的积分除以 a 和 b 之间的差值。分段常数函数的积分将是分段线性函数。所以,假设你有你的数据:
x = [0, 1.1, 2.2, 2.3, 2.8, 4]
y = [0, 0.88, 0.55, 0.11, 0.44]
创建一个函数,它将在 x 的任意值处给出其积分。这里数组 I
将包含每个给定值 x 的积分值,函数 f
是它的线性插值,它将给出任意点的精确值:
I = numpy.zeros_like(x)
I[1:] = numpy.cumsum(numpy.diff(x) * y)
f = scipy.interpolate.interp1d(x, I)
现在评估每个像素的平均值很容易:
pix_x = numpy.linspace(0, 4, 5)
pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])
我们可以检查这些数组中的内容:
>>> pix_x
array([0., 1., 2., 3., 4.])
>>> pix_y
array([0. , 0.792, 0.374, 0.44 ])
像素的阴影值现在在 pix_y
中。这些应该与您在上面的示例中给出的值完全匹配。
即使对于很多很多点,这也应该相当快:
def test(x, y):
I = numpy.zeros_like(x)
I[1:] = numpy.cumsum(numpy.diff(x) * y)
f = scipy.interpolate.interp1d(x, I,
bounds_error=False, fill_value=(0, I[-1]))
pix_x = numpy.linspace(0, 1, 1001)
pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])
return pix_y
timeit
报告:
225 ms ± 37.6 ms per loop
在我的系统上,当 x 的大小为 1000000(而 y 的大小为 999999)时。请注意,bounds_error=False
和 fill_value=(0, I[-1])
会传递给 interp1d
。这具有假设您的着色函数在 x 值范围之外为零的效果。此外,interp1d
不需要对输入值进行排序;在上面的测试中,我将 x 和 y 作为 0 到 1 之间的均匀随机数数组。但是,如果您确定它们已排序,您可以传递 assume_sorted=True
并且您应该获得速度提升:
20.2 ms ± 377 µs per loop
如您所料,这个问题有一个纯粹的 numpy 解决方案。诀窍是巧妙地混合 np.searchsorted
, which will place your regular grid on the nearest bin of the original, and np.add.reduceat
来计算 bins 的总和:
import numpy as np
def distribute(x, y, n):
"""
Down-samples/interpolates the y-values of each segment across a
domain with `n` points. `x` represents segment endpoints, so should
have one more element than `y`.
"""
y = np.asanyarray(y)
x = np.asanyarray(x)
new_x = np.linspace(x[0], x[-1], n + 1)
# Find the insertion indices
locs = np.searchsorted(x, new_x)[1:]
# create a matrix of indices
indices = np.zeros(2 * n, dtype=np.int)
# Fill it in
dloc = locs[:-1] - 1
indices[2::2] = dloc
indices[1::2] = locs
# This is the sum of every original segment a new segment touches
weighted = np.append(y * np.diff(x), 0)
sums = np.add.reduceat(weighted, indices)[::2]
# Now subtract the adjusted portions from the right end of the sums
sums[:-1] -= (x[dloc + 1] - new_x[1:-1]) * y[dloc]
# Now do the same for the left of each interval
sums[1:] -= (new_x[1:-1] - x[dloc]) * y[dloc]
return new_x, sums / np.diff(new_x)
seg = [0, 1.1, 2.2, 2.3, 2.8, 4]
color = [0, 0.88, 0.55, 0.11, 0.44]
seg, color = distribute(seg, color, 4)
print(seg, color)
结果是
[0. 1. 2. 3. 4.] [0. 0.792 0.374 0.44 ]
这正是您在手动计算中所期望的。
基准测试
我 运行 以下一组基准,以确保
from scipy.interpolate import interp1d
def EE_(x, y, n):
I = np.zeros_like(x)
I[1:] = np.cumsum(np.diff(x) * y)
f = interp1d(x, I, bounds_error=False, fill_value=(0, I[-1]))
pix_x = np.linspace(x[0], x[-1], n + 1)
pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])
return pix_x, pix_y
这里是测试台(方法 MadPhysicist
只是从上面的 distribute
函数重命名)。输入始终是 x
的 1001 个元素和 y
的 1000 个元素。输出数字为 5, 10, 100, 1000, 10000:
np.random.seed(0x1234ABCD)
x = np.cumsum(np.random.gamma(3.0, 0.2, size=1001))
y = np.random.uniform(0.0, 1.0, size=1000)
tests = (
MadPhysicist,
EE_,
)
for n in (5, 10, 100, 1000, 10000):
print(f'N = {n}')
results = {test.__name__: test(x, y, n) for test in tests}
for name, (x_out, y_out) in results.items():
print(f'{name}:\n\tx = {x_out}\n\ty = {y_out}')
allsame = np.array([[np.allclose(x1, x2) and np.allclose(y1, y2)
for x2, y2 in results.values()]
for x1, y1 in results.values()])
print()
print(f'Result Match:\n{allsame}')
from IPython import get_ipython
magic = get_ipython().magic
for test in tests:
print(f'{test.__name__}({n}):\n\t', end='')
magic(f'timeit {test.__name__}(x, y, n)')
我将跳过数据和协议打印输出(结果相同),并显示时间:
N = 5
MadPhysicist: 50.6 µs ± 349 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 110 µs ± 568 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 10
MadPhysicist: 50.5 µs ± 732 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 111 µs ± 635 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 100
MadPhysicist: 54.5 µs ± 284 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 114 µs ± 215 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 1000
MadPhysicist: 107 µs ± 5.73 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_: 148 µs ± 5.11 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
N = 10000
MadPhysicist: 458 µs ± 2.21 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
EE_: 301 µs ± 4.57 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
您可以看到,在较小的输出大小下,numpy 解决方案要快得多,这可能是因为开销占主导地位。然而,在更多的断点处,scipy 解决方案变得更快。您必须比较不同的输入大小才能真正了解时序如何计算,而不仅仅是不同的输出大小。