如何定期对格子行的元素求和

How to sum elements of the rows of a lattice periodically

假设我有一个格子

a = np.array([[1, 1, 1, 1],
              [2, 2, 2, 2],
              [3, 3, 3, 3],
              [4, 4, 4, 4]])

我想创建一个函数 func(lattice, start, end) 接受 3 个输入,其中开始和结束是行的索引,函数将对其元素求和。例如,对于 func(a,1,3),它将对这些行的所有元素求和,使得 func(a,1,3) = 2+2+2+2+3+3+3+3+4+4+4+4 = 36.

现在我知道这可以通过切片和 np.sum() 任何方式轻松完成。但关键是我想要 func 做的是也有环绕的能力。即func(a,2,4)应该return3+3+3+3+4+4+4+4+1+1+1+1。 更多的例子是

func(a,3,4) = 4+4+4+4+1+1+1+1
func(a,3,5) = 4+4+4+4+1+1+1+1+2+2+2+2
func(a,0,1) = 1+1+1+1+2+2+2+2

在我的情况下,我永远不会达到再次求和整个事情的地步,即

func(a,3,6) = sum of all elements

更新: 对于我的算法

for i in range(MC_STEPS_NODE):
    sweep(lattice, prob, start_index_master, end_index_master,
                      rows_for_master) 
    # calculate the energy
    Ene = subhamiltonian(lattice, start_index_master, end_index_master)  
    # calculate the magnetisation
    Mag = mag(lattice, start_index_master, end_index_master)
    E1 += Ene
    M1 += Mag
    E2 += Ene*Ene
    M2 += Mag*Mag

        if i % sites_for_master == 0:
            comm.Send([lattice[start_index_master:start_index_master+1], L, MPI.INT],
                              dest=(rank-1)%size, tag=4)
            comm.Recv([lattice[end_index_master:end_index_master+1], L, MPI.INT],
                              source = (rank+1)%size, tag=4)
            start_index_master = (start_index_master + 1)
            end_index_master = (end_index_master + 1)

            if start_index_master > 100:
                start_index_master = start_index_master % L

            if end_index_master > 100:
                end_index_master = end_index_master % L

我想要的函数是 mag() 函数,它计算子晶格的磁化强度,它只是其所有元素的总和。想象一个 LxL lattice 分裂成两个 sublattices,一个属于 master,另一个属于 worker。每个sweep扫过lattice对应的子格,start_index_masterend_index_master确定子格的起始行和结束行。对于每个 i%sites_for_master = 0,索引通过添加 1 向下移动,最终 mod 增加 100,以防止 mpi4py 中的内存溢出。所以你可以想象如果子晶格在主晶格的中心那么start_index_master < end_index_master。最终,子晶格将继续向下移动到 start_index_master < end_index_masterend_index_master > L 的点,因此在这种情况下,如果 start_index_master = 10 对于晶格 L=10,则子晶格的最底行是主格子的第一行([0])。

能量函数:

def subhamiltonian(lattice: np.ndarray, col_len_start: int,
                   col_len_end: int) -> float:

    energy = 0
    for i in range(col_len_start, col_len_end+1):
        for j in range(len(lattice)):
            spin = lattice[i%L, j]
            nb_sum = lattice[(i%L+1) % L, j] + lattice[i%L, (j+1) % L] + \
                     lattice[(i%L-1) % L, j] + lattice[i%L, (j-1) % L]
            energy += -nb_sum*spin

    return energy/4.

这是我计算子晶格能量的函数。

您可以使用 np.arange 创建要求和的索引。

>>> def func(lattice, start, end):
...     rows = lattice.shape[0]
...     return lattice[np.arange(start, end+1) % rows].sum()
... 
>>> func(a,3,4) 
20
>>> func(a,3,5)
28
>>> func(a,0,1)
12

您可以检查停止索引是否回绕以及它是否确实将数组开头的总和添加到结果中。这是高效的,因为它依赖于切片索引,并且只在必要时才做额外的工作。

def func(a, start, stop):
    stop += 1
    result = np.sum(a[start:stop])
    if stop > len(a):
        result += np.sum(a[:stop % len(a)])
    return result

以上版本适用于 stop - start < len(a),即不超过一个完整的环绕。对于任意数量的环绕(即 startstop 的任意值),可以使用以下版本:

def multi_wraps(a, start, stop):
    result = 0
    # Adjust both indices in case the start index wrapped around.
    stop -= (start // len(a)) * len(a)
    start %= len(a)
    stop += 1  # Include the element pointed to by the stop index.
    n_wraps = (stop - start) // len(a)
    if n_wraps > 0:
        result += n_wraps * a.sum()
    stop = start + (stop - start) % len(a)
    result += np.sum(a[start:stop])
    if stop > len(a):
        result += np.sum(a[:stop % len(a)])
    return result

如果 n_wraps > 0 数组的某些部分将被求和两次,这是不必要的低效率,因此我们可以根据需要计算各个数组部分的总和。以下版本最多对每个数组元素求和一次:

def multi_wraps_efficient(a, start, stop):
    # Adjust both indices in case the start index wrapped around.
    stop -= (start // len(a)) * len(a)
    start %= len(a)
    stop += 1  # Include the element pointed to by the stop index.
    n_wraps = (stop - start) // len(a)
    stop = start + (stop - start) % len(a)  # Eliminate the wraps since they will be accounted for separately.
    tail_sum = a[start:stop].sum()
    if stop > len(a):
        head_sum = a[:stop % len(a)].sum()
        if n_wraps > 0:
            remaining_sum = a[stop % len(a):start].sum()
    elif n_wraps > 0:
        head_sum = a[:start].sum()
        remaining_sum = a[stop:].sum()
    result = tail_sum
    if stop > len(a):
        result += head_sum
    if n_wraps > 0:
        result += n_wraps * (head_sum + tail_sum + remaining_sum)
    return result

下图显示了 与上面介绍的两种多重环绕方法之间的性能比较。测试是 运行 在 (1_000, 1_000) 格子上。可以观察到,对于 multi_wraps 方法,从 1 到 2 循环时 运行 时间增加,因为它不必要地对数组求和两次。 multi_wraps_efficient 方法具有相同的性能,无论环绕次数如何,因为它对每个数组元素求和不超过一次。

性能图是使用 perfplot package:

生成的
perfplot.show(
    setup=lambda n: (np.ones(shape=(1_000, 1_000), dtype=int), 400, n*1_000 + 200),
    kernels=[
        lambda x: index_arrays(*x),
        lambda x: multi_wraps(*x),
        lambda x: multi_wraps_efficient(*x),
    ],
    labels=['index_arrays', 'multi_wraps', 'multi_wraps_efficient'],
    n_range=range(1, 11),
    xlabel="Number of wrap-around",
    equality_check=lambda x, y: x == y,
)