信号处理功能的时间优化

Time optimization of function for signal processing

我有一个程序进行大量迭代(几千到几百万到几亿)。它开始花费相当多的时间(几分钟到几天),尽管我尽了最大的努力来优化它,但我还是有点卡住了。

配置文件:通过控制台调用使用 cProfile

ncalls     tottime  percall  cumtime  percall filename:lineno(function)
    500/1    0.018    0.000  119.860  119.860 {built-in method builtins.exec}
        1    0.006    0.006  119.860  119.860 Simulations_profiling.py:6(<module>)
      6/3    0.802    0.134  108.302   36.101 Simulations_profiling.py:702(optimization)
    38147    0.581    0.000  103.411    0.003 Simulations_profiling.py:270(overlap_duo_combination)
   107691   28.300    0.000  102.504    0.001 Simulations_profiling.py:225(duo_overlap)
 12615015   69.616    0.000   69.616    0.000 {built-in method builtins.round}

第一个问题,前两行是什么?我以为这是被调用的程序。

我将在 if / else 语句中用公差比较替换 round() 方法,从而避免这种时间消耗。我想进一步优化以下2个功能,但我找不到新的方法。

import itertools
import numpy as np

class Signal:
    def __init__(self, fq, t0, tf, width=0.3):
        self.fq = fq                                    # frequency in Hz
        self.width = float(width)                       # cathodic phase width in ms
        self.t0 = t0                                    # Instant of the first pulse in ms
        self.tf = tf                                    # End point in ms

        # List of instant at which a stim pulse is triggered in ms
        self.timeline = np.round(np.arange(t0, self.tf, 1/fq*1000), 3)

    def find_closest_t(self, t):
        val = min(self.timeline, key=lambda x:abs(x-t))
        id = np.where(self.timeline==val)[0][0]

        if val <= t or id == 0:
            return val, id
        else:
            return self.timeline[id-1], id-1

def duo_overlap(S1, S2, perc):

    pulse_id_t1, pulse_id_t2 = [], []

    start = max(S1.t0, S2.t0) - max(S1.width, S2.width)
    if start <= 0:
        start = 0
        start_id_S1 = 0
        start_id_S2 = 0
    else:
        start_id_S1 = S1.find_closest_t(start)[1]
        start_id_S2 = S2.find_closest_t(start)[1]

    stop = min(S1.tf, S2.tf) + max(S1.width, S2.width)

    for i in range(start_id_S1, len(S1.timeline)):
        if S1.timeline[i] > stop:
            break

        for j in range(start_id_S2, len(S2.timeline)):
            if S2.timeline[j] > stop:
                break

            d = round(abs(S2.timeline[j] - S1.timeline[i]), 3)  # Computation of the distance of the 2 point

            if S1.timeline[i] <= S2.timeline[j] and d < S1.width:
                pulse_id_t1.append(i)
                pulse_id_t2.append(j)
                continue

            elif S1.timeline[i] >= S2.timeline[j] and d < S2.width:
                pulse_id_t1.append(i)
                pulse_id_t2.append(j)
                continue

            else:
                continue

    return pulse_id_t1, pulse_id_t2

def overlap_duo_combination(signals, perc=0):

    overlap = dict()
    for i in range(len(signals)):
        overlap[i] = list()

    for subset in itertools.combinations(signals, 2):
        p1, p2 = duo_overlap(subset[0], subset[1], perc = perc)
        overlap[signals.index(subset[0])] += p1
        overlap[signals.index(subset[1])] += p2

    return overlap

程序说明:

我有宽度 Signal.width 和频率 Signal.fq 的方波信号,从 Signal.t0 开始,到 Signal.tf 结束。在 Signal 实例的初始化期间,会计算 timeline:它是一个一维浮点数组,其中 each number corresponds to the instant at which a pulse is triggered.

示例:

IN: Signal(50, 0, 250).timeline
OUT: array([  0.,  20.,  40.,  60.,  80., 100., 120., 140., 160., 180., 200., 220., 240.])

A pulse is triggered at t = 0, t = 20, t = 40, ... Each pulse has a width of 0.3.

duo_overlap() 在输入中获取 2 个信号(对于本示例,我们将固定为 0 的百分比。此函数计算 S1 和 S2 的脉冲 ID(时间轴数组中的 ID ) 相互重叠。

示例:

If a pulse starts at t = 0 for S1 and a pulse starts at t = 0.2 for S2, since 0.2 - 0 = 0.2 < 0.3 (S1.width), the 2 pulses overlap.

我试图通过仅在可能重叠的 ID 上循环来优化此函数 (start_id, stop) computed ahead.But 如您所见,此函数是由于调用次数过多,仍然非常耗时。

最后一个函数,overlap_duo_combination() 将输入中的 N 个信号作为列表(或元组/可迭代)(实际上是 2 <= N <= 6)并创建一个 dict(),其中key 是输入列表中信号的 ID,value 是重叠脉冲 ID 的列表(输入列表中信号的 2 x 2 比较)。

示例:

Input: signals = (S1, S2, S3) The pulse n°2 of S1 overlap with pulse n°3 of S2 and the pulse n°3 of S2 overlap with pulse n°5 of S3.

Output: dict[0] = [2] / dict[1] = [3, 3] / dict[2] = [5]

S2 的 3 弹出两次,因为它将添加第一个图块 duo_overlap() 在 S1 和 S2 上调用,第二次在 S2 和 S3 上调用。 我不想避免重复,因为它是关于有多少不同脉冲重叠的信息(在这种情况下,2 个脉冲与 S2 的脉冲 n°3 重叠)。

您有什么想法、建议、代码或其他任何东西来降低这部分代码的时间复杂度吗?

我目前正在研究 PyCUDA 实现,因为我有 Nvidia 1080 Ti 可供使用,但我不知道 C 语言。 这个调用时执行时间不长但被调用数千次的内部函数是否值得切换到 GPU?

感谢阅读这么长的文章post,感谢您的帮助!

如果我对你的问题的理解正确,你可以通过依赖 numpy 而不是执行所有循环来加速你的 duo_overlap() 函数。

您想从 S1.timeline 中减去 S2.timeline 的所有值,并将差值与信号宽度进行比较。以下函数重复 S1.timeline(重复为列)并从每一行中减去 S2.timeline。因此,行索引对应于 S1.timeline,列索引对应于 S2.timeline.

def duo_overlap_np(S1, S2):
    x = S1.timeline
    y = S2.timeline

    mat = np.repeat(x, y.shape[0]).reshape(x.shape[0],y.shape[0])
    mat = mat - y

    # End of S1 pulse overlaps with start of S2 pulse
    overlap_1 = np.where((0 >= mat) & (mat >= -S1.width))

    # End of S2 pulse overlaps with start of S1 pulse
    overlap_2 = np.where((0 <= mat) & (mat <= S2.width))

    # Flatten the overlap arrays. The first element returned by np.where
    # corresponds to S1, the second to S2
    S1_overlap = np.concatenate([overlap_1[0], overlap_2[0]])
    S2_overlap = np.concatenate([overlap_1[1], overlap_2[1]])

    return S1_overlap, S2_overlap

在我的机器上进行快速速度比较,

S1 = Signal(50, 0, 1000, width=0.3)
S2 = Signal(25.5, 20.2, 1000, width=0.6)

%timeit duo_overlap(S1, S2, 0)
# 7 ms ± 284 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

%timeit duo_overlap_np(S1, S2)
# 38.2 µs ± 2.42 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)