无法与多个 MPI 步长并行地添加数组中的元素
Failed add up elements in the array in parallel with several strides MPI
我正在尝试并行添加数组中的元素。我得到了一个算法示例,我按照该算法将数组中具有不同步长的元素相加:
input = [3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57]
First Stride (Add every N/2^1 to N/2^1 + N/2^(1+1):
input = [ 3,10,1,22,8,28,4,53,4,4,0,57,0,0,0,57]
Second Stride (Add every N/2^2 to N/2^2 + +N/2^(2+1):
input = [3,10,1,22,8,50,4,53,4,57,0,57,0,57,0,57]
Third Stride (Add every N/2^3 to N/2^3 + N/2^(3+1):
input = [3,10,11,22,30,50,54,53,57,57,57,57,57,57,57,57]
我编写代码将添加工作平均分配给我的处理器。 (请注意,我试图避免使用 MPI_Scan)
每个处理器都有一个临时值,这意味着更改后的数组值 MPI_Gather
它返回到根,然后根将更改整个输入数组和 MPI_cast
每个处理器的输入到在进入下一个步幅之前再次进行添加工作。
但是,我的结果似乎并不如我所愿。如果有人能告诉我我在代码中做错了什么,我将不胜感激。
这是我的代码:(更新)
int DownPhaseFunction(int* input, int size_per_process, int rank, int totalarray, int size, int* Temp0)
{
//rank is the id of processor
//size is the total number of processors
int temp =0;
int index = 0;
int index0 = 0;
//First Stride
if(rank == 0)
{
input[(totalarray)-(totalarray/2)+(totalarray/4)-1] += input[(totalarray)-(totalarray/2)-1];
}
MPI_Bcast(input,totalarray,MPI_INT,0,MPI_COMM_WORLD);
//Start with Second Stride to the end
for (int i=4 ; i<totalarray ; i*=2)
{
//if the number of elements to be changed is larger than total size of processors, do a few more strides
for(int j=0;j<=i;j+=(size*totalarray/i))
{
index = ( (rank+1)*totalarray/i) + j;
if (index != totalarray)
{
temp = input[(index+(totalarray/i)/2)-1] + input[index-1];
}
else
{
temp = input[index-1];
}
//Gather the changed elements back to root
MPI_Gather (&temp, size , MPI_INT, Temp0, size, MPI_INT,0,MPI_COMM_WORLD );
//Let root change the changed elements in the input array
if(rank == 0)
{
for(int r=0; r<size; r++)
{
index0 = ((r+1)*totalarray/i)+j;
if( (index0) != totalarray)
{
input[(index0+(totalarray/i)/2-1)] = Temp0[r];
}
}
}
//send it back to every processor to do the next stride
MPI_Bcast(input,totalarray,MPI_INT,0,MPI_COMM_WORLD);
}
}
return(*input);
}
The processor each has a temp value which means the changed array
value and MPI_Gather it back to root, then the root will change the
whole input array and MPI_Bcast the input to each processor to do the
adding work again before entering the next stride.
IMO 这种设计使事情复杂化。我建议首先明确设置允许每个进程工作的 input
数组范围的边界。对于 16 的输入,它将如下所示:
Process 0 works from [0 to 4[
Process 1 works from [4 to 8[
Process 2 works from [8 to 12[
Process 3 works from [12 to 16[
要计算这些范围,可以使用以下公式:
int begin = rank * size_per_process;
int end = (rank + 1) * size_per_process;
为了实现剩余逻辑,我们首先创建一个循环,该循环首先将数组分成两半,并且对于每次迭代,我们都会将数组分成两半。
int split_size = totalarray/2;
while(split_size > 1){
//....
split_size = split_size/2;
}
我们需要一个额外的循环来使用当前的拆分大小迭代input
数组,即:
for(int i = split_size; i < totalarray; i+= split_size){
//...
}
每个rank将只允许在分配给该进程的数组拦截上工作,即:
for(int i = split_size; i < totalarray; i+= split_size){
int dest = i + (split_size/2) - 1;
if(begin <= dest && dest < end)
input[dest] += input[i -1];
}
改进(但可读性较差)的版本:
int shift = (split_size/2) - 1;
int dest = ((begin == 0) ? split_size : (split_size/begin) * split_size) + shift;
for(; dest < end; dest += split_size)
input[dest] += input[dest - shift -1];
在每一步之后,所有进程都将他们的数组拦截发送给其他进程:
MPI_Allgather(MPI_IN_PLACE, size_per_process, MPI_INT, input, size_per_process, MPI_INT, MPI_COMM_WORLD);
MPI_IN_PLACE 确保新的 input
数组(由收集所有进程完成的工作产生)替换旧的 input
数组。对于 16 个元素和 4 个进程的输入,进程 0、1、2 和 3 将发送它们的元素 [0 到 4[,[4 到 8[,[8 到 12[,和 [12 到 16[ input
数组分别指向所有其他进程。因此,在 MPI_Allgather
调用结束时,每个进程都有最新的 input
数组,其中包含当前迭代期间进程更改的所有部分。
因此对于具有 4 个进程的输入 = [3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57]
,迭代将如下所示:
第一步:
Process 2 input[11] += input[7] (input[11] = 4 + 53)
输入数组:[3, 10, 1, 22, 8, 28, 4, 53, 4, 4, 0, 57, 0, 0, 0, 57 ]
第二步:
Process 1 input[5] += input[3] (input[5] = 28 + 22)
Process 2 input[9] += input[7] (input[9] = 4 + 53)
Process 3 input[13] += input[11] (input[13] = 0 + 57)
输入数组:[3, 10, 1, 22, 8, 50, 4, 53, 4, 57, 0 , 57, 0, 57, 0, 57]
第三步:
2, Process 0 input[2] += input[1] (input[2] = 1 + 10)
2, Process 1 input[4] += input[3] (input[4] = 8 + 22)
2, Process 1 input[6] += input[5] (input[6] = 4 + 50)
2, Process 2 input[8] += input[7] (input[8] = 4 + 53)
2, Process 2 input[10] += input[9] (input[10] = 0 + 57)
2, Process 3 input[12] += input[11] (input[12] = 0 + 57)
2, Process 3 input[14] += input[13] (input[14] = 0 + 57)
输入 = [3, 10, 11, 22, 30, 50, 54, 53, 57, 57, 57, 57, 57, 57, 57, 57]
一个完整的运行例子:
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
void printArray(int *array, int size){
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if(rank == 0){
for(int i = 0; i < size; i++)
printf("%2d ", array[i]);
printf("\n");
}
}
int main(int argc, char **argv){
int totalarray = 16;
int rank, size;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
int input[16] = {3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57};
printArray(input, totalarray);
int size_per_process = totalarray/size;
int begin = rank * size_per_process;
int end = (rank + 1) * size_per_process;
int split_size = totalarray/2;
while(split_size > 1){
int shift = (split_size/2) - 1;
int dest = ((begin == 0) ? split_size : (split_size/begin) * split_size) + shift;
for(; dest < end; dest += split_size)
input[dest] += input[dest - shift -1];
MPI_Allgather(MPI_IN_PLACE, size_per_process, MPI_INT, input, size_per_process, MPI_INT, MPI_COMM_WORLD);
split_size = split_size/2;
}
printArray(input, totalarray);
MPI_Finalize();
return 0;
}
输入: {3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57}
输出: {3,10,11,22,30,50,54,53,57,57,57,57,57,57,57,57}
请记住,这是所提议设计的玩具示例;不是防弹准备用于生产类型的代码。
我正在尝试并行添加数组中的元素。我得到了一个算法示例,我按照该算法将数组中具有不同步长的元素相加:
input = [3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57]
First Stride (Add every N/2^1 to N/2^1 + N/2^(1+1):
input = [ 3,10,1,22,8,28,4,53,4,4,0,57,0,0,0,57]
Second Stride (Add every N/2^2 to N/2^2 + +N/2^(2+1):
input = [3,10,1,22,8,50,4,53,4,57,0,57,0,57,0,57]
Third Stride (Add every N/2^3 to N/2^3 + N/2^(3+1):
input = [3,10,11,22,30,50,54,53,57,57,57,57,57,57,57,57]
我编写代码将添加工作平均分配给我的处理器。 (请注意,我试图避免使用 MPI_Scan)
每个处理器都有一个临时值,这意味着更改后的数组值 MPI_Gather
它返回到根,然后根将更改整个输入数组和 MPI_cast
每个处理器的输入到在进入下一个步幅之前再次进行添加工作。
但是,我的结果似乎并不如我所愿。如果有人能告诉我我在代码中做错了什么,我将不胜感激。
这是我的代码:(更新)
int DownPhaseFunction(int* input, int size_per_process, int rank, int totalarray, int size, int* Temp0)
{
//rank is the id of processor
//size is the total number of processors
int temp =0;
int index = 0;
int index0 = 0;
//First Stride
if(rank == 0)
{
input[(totalarray)-(totalarray/2)+(totalarray/4)-1] += input[(totalarray)-(totalarray/2)-1];
}
MPI_Bcast(input,totalarray,MPI_INT,0,MPI_COMM_WORLD);
//Start with Second Stride to the end
for (int i=4 ; i<totalarray ; i*=2)
{
//if the number of elements to be changed is larger than total size of processors, do a few more strides
for(int j=0;j<=i;j+=(size*totalarray/i))
{
index = ( (rank+1)*totalarray/i) + j;
if (index != totalarray)
{
temp = input[(index+(totalarray/i)/2)-1] + input[index-1];
}
else
{
temp = input[index-1];
}
//Gather the changed elements back to root
MPI_Gather (&temp, size , MPI_INT, Temp0, size, MPI_INT,0,MPI_COMM_WORLD );
//Let root change the changed elements in the input array
if(rank == 0)
{
for(int r=0; r<size; r++)
{
index0 = ((r+1)*totalarray/i)+j;
if( (index0) != totalarray)
{
input[(index0+(totalarray/i)/2-1)] = Temp0[r];
}
}
}
//send it back to every processor to do the next stride
MPI_Bcast(input,totalarray,MPI_INT,0,MPI_COMM_WORLD);
}
}
return(*input);
}
The processor each has a temp value which means the changed array value and MPI_Gather it back to root, then the root will change the whole input array and MPI_Bcast the input to each processor to do the adding work again before entering the next stride.
IMO 这种设计使事情复杂化。我建议首先明确设置允许每个进程工作的 input
数组范围的边界。对于 16 的输入,它将如下所示:
Process 0 works from [0 to 4[
Process 1 works from [4 to 8[
Process 2 works from [8 to 12[
Process 3 works from [12 to 16[
要计算这些范围,可以使用以下公式:
int begin = rank * size_per_process;
int end = (rank + 1) * size_per_process;
为了实现剩余逻辑,我们首先创建一个循环,该循环首先将数组分成两半,并且对于每次迭代,我们都会将数组分成两半。
int split_size = totalarray/2;
while(split_size > 1){
//....
split_size = split_size/2;
}
我们需要一个额外的循环来使用当前的拆分大小迭代input
数组,即:
for(int i = split_size; i < totalarray; i+= split_size){
//...
}
每个rank将只允许在分配给该进程的数组拦截上工作,即:
for(int i = split_size; i < totalarray; i+= split_size){
int dest = i + (split_size/2) - 1;
if(begin <= dest && dest < end)
input[dest] += input[i -1];
}
改进(但可读性较差)的版本:
int shift = (split_size/2) - 1;
int dest = ((begin == 0) ? split_size : (split_size/begin) * split_size) + shift;
for(; dest < end; dest += split_size)
input[dest] += input[dest - shift -1];
在每一步之后,所有进程都将他们的数组拦截发送给其他进程:
MPI_Allgather(MPI_IN_PLACE, size_per_process, MPI_INT, input, size_per_process, MPI_INT, MPI_COMM_WORLD);
MPI_IN_PLACE 确保新的 input
数组(由收集所有进程完成的工作产生)替换旧的 input
数组。对于 16 个元素和 4 个进程的输入,进程 0、1、2 和 3 将发送它们的元素 [0 到 4[,[4 到 8[,[8 到 12[,和 [12 到 16[ input
数组分别指向所有其他进程。因此,在 MPI_Allgather
调用结束时,每个进程都有最新的 input
数组,其中包含当前迭代期间进程更改的所有部分。
因此对于具有 4 个进程的输入 = [3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57]
,迭代将如下所示:
第一步:
Process 2 input[11] += input[7] (input[11] = 4 + 53)
输入数组:[3, 10, 1, 22, 8, 28, 4, 53, 4, 4, 0, 57, 0, 0, 0, 57 ]
第二步:
Process 1 input[5] += input[3] (input[5] = 28 + 22)
Process 2 input[9] += input[7] (input[9] = 4 + 53)
Process 3 input[13] += input[11] (input[13] = 0 + 57)
输入数组:[3, 10, 1, 22, 8, 50, 4, 53, 4, 57, 0 , 57, 0, 57, 0, 57]
第三步:
2, Process 0 input[2] += input[1] (input[2] = 1 + 10)
2, Process 1 input[4] += input[3] (input[4] = 8 + 22)
2, Process 1 input[6] += input[5] (input[6] = 4 + 50)
2, Process 2 input[8] += input[7] (input[8] = 4 + 53)
2, Process 2 input[10] += input[9] (input[10] = 0 + 57)
2, Process 3 input[12] += input[11] (input[12] = 0 + 57)
2, Process 3 input[14] += input[13] (input[14] = 0 + 57)
输入 = [3, 10, 11, 22, 30, 50, 54, 53, 57, 57, 57, 57, 57, 57, 57, 57]
一个完整的运行例子:
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
void printArray(int *array, int size){
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if(rank == 0){
for(int i = 0; i < size; i++)
printf("%2d ", array[i]);
printf("\n");
}
}
int main(int argc, char **argv){
int totalarray = 16;
int rank, size;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
int input[16] = {3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57};
printArray(input, totalarray);
int size_per_process = totalarray/size;
int begin = rank * size_per_process;
int end = (rank + 1) * size_per_process;
int split_size = totalarray/2;
while(split_size > 1){
int shift = (split_size/2) - 1;
int dest = ((begin == 0) ? split_size : (split_size/begin) * split_size) + shift;
for(; dest < end; dest += split_size)
input[dest] += input[dest - shift -1];
MPI_Allgather(MPI_IN_PLACE, size_per_process, MPI_INT, input, size_per_process, MPI_INT, MPI_COMM_WORLD);
split_size = split_size/2;
}
printArray(input, totalarray);
MPI_Finalize();
return 0;
}
输入: {3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57}
输出: {3,10,11,22,30,50,54,53,57,57,57,57,57,57,57,57}
请记住,这是所提议设计的玩具示例;不是防弹准备用于生产类型的代码。