将循环的 pixel(x, y) 转换为 numpy.nditer Iterator

Convert pixel(x, y) for loop to numpy.nditer Iterator

我正在尝试加快某些 pygame 图像处理代码的速度,该代码遍历每个像素并对其进行修改。我正在查看 numpy nditer 函数,但我正在努力弄清楚如何实现它。

    # Iterate though main image
    for x, row in enumerate(main):
        for y, pix1 in enumerate(row):

            # Check the pixel isn't too dark to worry about
            if pix1[0] + pix1[1] + pix1[2] > 10:

                # Calculate distance to light source
                light_distance = np.hypot( x - light_source_pos[0], y - light_source_pos[1] )

                # Calculate light intensity
                light_intensity = (300 - light_distance) / 300

                # Apply light color and intensity to the specular map, apply specular gain then add to main
                main[x][y] += light_color * light_intensity * specular[x][y] * specular_gain

                # Apply light color and intensity to the diffuse map, apply diffuse gain then add to main
                main[x][y] += light_color * light_intensity * diffuse[x][y] * diffuse_gain

我正在迭代由 pygame.surfarray.pixels3d() 生成的图像数据 [x][y][r][g][b] 的数组。该数组不是副本,它是对实际内存内容的引用。

我如何创建遍历 x 和 y 坐标并尽快应用更改的迭代器?

据我所知,按内存顺序对像素进行操作,并将所有内容都放在迭代器循环中会更快吗?

编辑:上面的代码片段是为了更容易理解,但整个脚本都是要点 here。要 运行 它,您需要一些源图像才能使用。

查看代码,似乎实现肯定是可并行化的,因此我们可以有一个矢量化实现。现在,为了消除循环,我们需要在某些地方扩展输入的维度,这将让 broadcasting 发挥作用。

为了便于代码查找和维护,我假定这些缩写为 -

S = specular
D = diffuse
LSP = light_source_pos
LC = light_color
S_gain = specular_gain
D_gain = diffuse_gain

这是向量化问题的一种方法 -

# Vectorize light_distance calculations and thereafter for light_intensity
LD = (np.hypot(np.arange(M)[:,None] - LSP[0], np.arange(N) - LSP[1]))
LI = (300 - LD) / 300

# Vectorized "LC * light_intensity * S[x][y] * S_gain" and 
# "LC * light_intensity * D[x][y] * D_gain" calculations
add_part = (LC*LI[...,None]*S*S_gain) + (LC*LI[...,None]*D*D_gain)

# Get masked places set by "pix1[0] + pix1[1] + pix1[2] > 10", which would be 
# "main.sum(2) > 10". Use mask to add selective elements from add_part into main 
main += (add_part*(main.sum(2)[...,None] > 10))

运行时测试和验证输出

定义函数-

def original_app(main,S,D,LSP,LC,S_gain,D_gain):
    for x, row in enumerate(main):
        for y, pix1 in enumerate(row):
            if pix1[0] + pix1[1] + pix1[2] > 10:
                light_distance = np.hypot( x - LSP[0], y - LSP[1] )
                light_intensity = (300 - light_distance) / 300
                main[x][y] += LC * light_intensity * S[x][y] * S_gain
                main[x][y] += LC * light_intensity * D[x][y] * D_gain


def vectorized_app(main,S,D,LSP,LC,S_gain,D_gain):
    LD = (np.hypot(np.arange(M)[:,None] - LSP[0], np.arange(N) - LSP[1]))
    LI = (300 - LD) / 300
    add_part = (LC*LI[...,None]*S*S_gain) + (LC*LI[...,None]*D*D_gain)
    main += (add_part*(main.sum(2)[...,None] > 10))

运行时间 -

In [38]: # Inputs
    ...: M,N,R = 300,200,3 # Shape as stated in the comments
    ...: main = np.random.rand(M,N,R)*10
    ...: S = np.random.rand(M,N,R)
    ...: D = np.random.rand(M,N,R)
    ...: LSP = [3,10]
    ...: LC = np.array([2,6,3])
    ...: S_gain = 0.45
    ...: D_gain = 0.22
    ...: 
    ...: # Make copies as functions would change those
    ...: mainc1 = main.copy()
    ...: mainc2 = main.copy()
    ...: 

In [39]: original_app(mainc1,S,D,LSP,LC,S_gain,D_gain)

In [40]: vectorized_app(mainc2,S,D,LSP,LC,S_gain,D_gain)

In [41]: np.allclose(mainc1,mainc2) # Verify outputs
Out[41]: True

In [42]: # Make copies again for timing as functions would change those
    ...: mainc1 = main.copy()
    ...: mainc2 = main.copy()
    ...: 

In [43]: %timeit original_app(mainc1,S,D,LSP,LC,S_gain,D_gain)
1 loops, best of 3: 1.28 s per loop

In [44]: %timeit vectorized_app(mainc2,S,D,LSP,LC,S_gain,D_gain)
100 loops, best of 3: 15.4 ms per loop

In [45]: 1280/15.4 # Speedup
Out[45]: 83.11688311688312