无法与多个 MPI 步长并行地添加数组中的元素

Failed add up elements in the array in parallel with several strides MPI

我正在尝试并行添加数组中的元素。我得到了一个算法示例,我按照该算法将数组中具有不同步长的元素相加:

input = [3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57]

First Stride (Add every N/2^1 to N/2^1 + N/2^(1+1): 
input = [ 3,10,1,22,8,28,4,53,4,4,0,57,0,0,0,57]

Second Stride (Add every N/2^2 to N/2^2 + +N/2^(2+1):
input = [3,10,1,22,8,50,4,53,4,57,0,57,0,57,0,57]

Third Stride (Add every N/2^3 to N/2^3 + N/2^(3+1):
input = [3,10,11,22,30,50,54,53,57,57,57,57,57,57,57,57]

我编写代码将添加工作平均分配给我的处理器。 (请注意,我试图避免使用 MPI_Scan)

每个处理器都有一个临时值,这意味着更改后的数组值 MPI_Gather 它返回到根,然后根将更改整个输入数组和 MPI_cast 每个处理器的输入到在进入下一个步幅之前再次进行添加工作。

但是,我的结果似乎并不如我所愿。如果有人能告诉我我在代码中做错了什么,我将不胜感激。

这是我的代码:(更新)

int DownPhaseFunction(int* input, int size_per_process, int rank, int totalarray, int size, int* Temp0)
{
    //rank is the id of processor
    //size is the total number of processors

    int temp =0;
    int index = 0;
    int index0 = 0;

    //First Stride
    if(rank == 0)
    {
      input[(totalarray)-(totalarray/2)+(totalarray/4)-1] += input[(totalarray)-(totalarray/2)-1];
    }

    MPI_Bcast(input,totalarray,MPI_INT,0,MPI_COMM_WORLD);

    //Start with Second Stride to the end
    for (int i=4 ; i<totalarray ; i*=2)
    {
    //if the number of elements to be changed is larger than total size of processors, do a few more strides
    for(int j=0;j<=i;j+=(size*totalarray/i))
    {
        index = ( (rank+1)*totalarray/i) + j;
        if (index != totalarray)
        {
            temp = input[(index+(totalarray/i)/2)-1] + input[index-1];
        }
        else
        {
            temp = input[index-1];
        }

        //Gather the changed elements back to root
        MPI_Gather (&temp, size , MPI_INT, Temp0, size, MPI_INT,0,MPI_COMM_WORLD );

        //Let root change the changed elements in the input array
        if(rank == 0)
            {
            for(int r=0; r<size; r++)
            {
            index0 = ((r+1)*totalarray/i)+j;

            if( (index0) != totalarray)
                {
                input[(index0+(totalarray/i)/2-1)] = Temp0[r];
                }

            }
            }

        //send it back to every processor to do the next stride
        MPI_Bcast(input,totalarray,MPI_INT,0,MPI_COMM_WORLD);
    }
    }

    return(*input);
}

The processor each has a temp value which means the changed array value and MPI_Gather it back to root, then the root will change the whole input array and MPI_Bcast the input to each processor to do the adding work again before entering the next stride.

IMO 这种设计使事情复杂化。我建议首先明确设置允许每个进程工作的 input 数组范围的边界。对于 16 的输入,它将如下所示:

Process 0 works from [0 to 4[
Process 1 works from [4 to 8[
Process 2 works from [8 to 12[
Process 3 works from [12 to 16[

要计算这些范围,可以使用以下公式:

    int begin = rank * size_per_process;
    int end =  (rank + 1) * size_per_process;

为了实现剩余逻辑,我们首先创建一个循环,该循环首先将数组分成两半,并且对于每次迭代,我们都会将数组分成两半。

    int split_size = totalarray/2;
    while(split_size > 1){
        //....
        split_size = split_size/2;
    }

我们需要一个额外的循环来使用当前的拆分大小迭代input数组,即:

        for(int i = split_size; i < totalarray; i+= split_size){
               //... 
        }  

每个rank将只允许在分配给该进程的数组拦截上工作,即:

   for(int i = split_size; i < totalarray; i+= split_size){
       int dest = i + (split_size/2) - 1;
       if(begin <= dest && dest < end)
          input[dest] += input[i -1];
    }

改进(但可读性较差)的版本:

   int shift = (split_size/2) - 1;
   int dest = ((begin == 0) ? split_size : (split_size/begin) * split_size) + shift;
   for(; dest < end; dest += split_size)
      input[dest] += input[dest - shift -1];
  

在每一步之后,所有进程都将他们的数组拦截发送给其他进程:

 MPI_Allgather(MPI_IN_PLACE, size_per_process, MPI_INT, input, size_per_process, MPI_INT, MPI_COMM_WORLD);

MPI_IN_PLACE 确保新的 input 数组(由收集所有进程完成的工作产生)替换旧的 input 数组。对于 16 个元素和 4 个进程的输入,进程 0、1、2 和 3 将发送它们的元素 [0 到 4[,[4 到 8[,[8 到 12[,和 [12 到 16[ input 数组分别指向所有其他进程。因此,在 MPI_Allgather 调用结束时,每个进程都有最新的 input 数组,其中包含当前迭代期间进程更改的所有部分。

因此对于具有 4 个进程的输入 = [3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57],迭代将如下所示:

第一步:

Process 2 input[11] += input[7] (input[11] = 4 + 53) 

输入数组:[3, 10, 1, 22, 8, 28, 4, 53, 4, 4, 0, 57, 0, 0, 0, 57 ]

第二步:

Process 1 input[5] += input[3] (input[5] = 28 + 22)
Process 2 input[9] += input[7] (input[9] = 4 + 53)
Process 3 input[13] += input[11] (input[13] = 0 + 57)

输入数组:[3, 10, 1, 22, 8, 50, 4, 53, 4, 57, 0 , 57, 0, 57, 0, 57]

第三步:

2, Process 0 input[2] += input[1] (input[2] = 1 + 10)
2, Process 1 input[4] += input[3] (input[4] = 8 + 22)
2, Process 1 input[6] += input[5] (input[6] = 4 + 50)
2, Process 2 input[8] += input[7] (input[8] = 4 + 53)
2, Process 2 input[10] += input[9] (input[10] = 0 + 57)
2, Process 3 input[12] += input[11] (input[12] = 0 + 57)
2, Process 3 input[14] += input[13] (input[14] = 0 + 57)

输入 = [3, 10, 11, 22, 30, 50, 54, 53, 57, 57, 57, 57, 57, 57, 57, 57]

一个完整的运行例子:

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

void printArray(int *array, int size){
     int rank;
     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
     if(rank == 0){
    for(int i = 0; i < size; i++)
            printf("%2d ", array[i]);
        printf("\n");
    }
}


int main(int argc, char **argv){
    int totalarray = 16;
    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    int input[16] = {3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57}; 

    printArray(input, totalarray);

    int size_per_process = totalarray/size;
    int begin = rank * size_per_process;
    int end =  (rank + 1) * size_per_process;
    int split_size = totalarray/2;
    while(split_size > 1){
        int shift = (split_size/2) - 1;
        int dest = ((begin == 0) ? split_size : (split_size/begin) * split_size) + shift;
        for(; dest < end; dest += split_size)
            input[dest] += input[dest - shift -1];
        MPI_Allgather(MPI_IN_PLACE, size_per_process, MPI_INT, input, size_per_process, MPI_INT, MPI_COMM_WORLD);
        split_size = split_size/2;
    }
        
    printArray(input, totalarray);
    MPI_Finalize();

    return 0;
}
       

输入: {3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57}
输出: {3,10,11,22,30,50,54,53,57,57,57,57,57,57,57,57}

请记住,这是所提议设计的玩具示例;不是防弹准备用于生产类型的代码。