如何检查消息在 MPI 中是否可用?

How to check if messages are available in MPI?

有没有办法查看某个节点是否是消息的接收者?像这样:

int MPI_HasMessage()

我正在尝试编写一个确实有效的循环,如果另一个节点找到了解决方案,它会被根节点 (rank = 0) 中断。否则它会继续。

for(int i = 0; i < workload; i++)
{
    doWork();
    if(MPI_HasMessage())
    {
        MPI_recv(...);
    }
}

您可能对 MPI_Iprobe() 功能感兴趣,该功能允许在不实际接收消息的情况下检查传入的消息。

有一个例子there,其中进程0使用MPI_Isend()向自己发送消息,然后探测它并最终使用MPI_Recv()接收它。

以下 C 代码使用 MPI_ANY_SOURCE 来探测来自通信器中任何进程的消息。可以通过mpicc main.c -o main -Wall编译,运行通过mpirun -np 4 main编译。

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

int main(int argc,char *argv[])
{
    int  size, rank;
    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&size);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    MPI_Status status;

    srand(time(NULL));
    int r=0;    
    if(rank==0){
        r = rand()%size;  // ugly random, not even uniformly distributed

    }
    MPI_Bcast(&r,1,MPI_INT,0,MPI_COMM_WORLD);

    MPI_Request request=MPI_REQUEST_NULL;
    if(rank==r){
        MPI_Isend(&r,1,MPI_INT,0,0,MPI_COMM_WORLD,&request);
    }

    if(rank==0){
        int flag=0;

        // probing the message
        while(!flag)
        {
            // probing on a regular basis, process zero can do anything there
            int i;
            int coffee=42;
            for(i=0;i<1000000;i++){
                coffee+=1;
            }

            MPI_Iprobe( MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &flag, &status );
        }

        // the status argument retruned by the function could have been used to make sure that
        // MPI_Recv receives the probed message. MPI_Get_count( &status, MPI_INT, &count ); 
        // could have been added to retrieve the length of the message 
        MPI_Recv( &r, 1, MPI_INT,MPI_ANY_SOURCE , 0, MPI_COMM_WORLD, &status );

        printf("process 0 got %d\n",r);
    }


    MPI_Wait(&request, &status);

    MPI_Finalize();
    return 0;
}

Francis 确实正确回答了您的问题。然而,这实际上是 XY-problem.

的一个实例

您的实际问题是:

找到解决方案后如何通知所有进程

这似乎是一个常见问题,没有一个简单的答案,检查的问题和答案。

您绝对不需要使用 MPI_Iprobe,通过 MPI_Irecv 预先发布的消息就可以了。我的建议是使用非阻塞集体,如

中更多默认所述
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

const int iter_max = 10000;
const int difficulty = 20000;

int find_stuff()
{
    int num_iters = rand() % iter_max;
    for (int i = 0; i < num_iters; i++) {
        if (rand() % difficulty == 0) {
            return 1;
        }
    }
    return 0;
}

const int stop_tag = 42;
const int root = 0;

int forward_stop(MPI_Request* root_recv_stop, MPI_Request* all_recv_stop, int found_count)
{
    int flag;
    MPI_Status status;
    if (found_count == 0) {
        MPI_Test(root_recv_stop, &flag, &status);
    } else {
        // If we find something on the root, we actually wait until we receive our own message.
        MPI_Wait(root_recv_stop, &status);
        flag = 1;
    }
    if (flag) {
        printf("Forwarding stop signal from %d\n", status.MPI_SOURCE);
        MPI_Ibarrier(MPI_COMM_WORLD, all_recv_stop);
        MPI_Wait(all_recv_stop, MPI_STATUS_IGNORE);
        // We must post some additional receives if multiple ranks found something at the same time
        MPI_Reduce(MPI_IN_PLACE, &found_count, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
        for (found_count--; found_count > 0; found_count--) {
            MPI_Recv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &status);
            printf("Additional stop from: %d\n", status.MPI_SOURCE);
        }
        return 1;
    }
    return 0;
}

int main()
{
    MPI_Init(NULL, NULL);

    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    srand(rank);

    MPI_Request root_recv_stop;
    MPI_Request all_recv_stop;
    if (rank == root) {
        MPI_Irecv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &root_recv_stop);
    } else {
        // You may want to use an extra communicator here, to avoid messing with other barriers
        MPI_Ibarrier(MPI_COMM_WORLD, &all_recv_stop);
    }

    while (1) {
        int found = find_stuff();
        if (found) {
            printf("Rank %d found something.\n", rank);
            // Note: We cannot post this as blocking, otherwise there is a deadlock with the reduce
            MPI_Request req;
            MPI_Isend(NULL, 0, MPI_CHAR, root, stop_tag, MPI_COMM_WORLD, &req);
            if (rank != root) {
                // We know that we are going to receive our own stop signal.
                // This avoids running another useless iteration
                MPI_Wait(&all_recv_stop, MPI_STATUS_IGNORE);
                MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
                MPI_Wait(&req, MPI_STATUS_IGNORE);
                break;
            }
            MPI_Wait(&req, MPI_STATUS_IGNORE);
        }
        if (rank == root) {
            if (forward_stop(&root_recv_stop, &all_recv_stop, found)) {
                break;
            }
        } else {
            int stop_signal;
            MPI_Test(&all_recv_stop, &stop_signal, MPI_STATUS_IGNORE);
            if (stop_signal)
            {
                MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
                printf("Rank %d stopping after receiving signal.\n", rank);
                break;
            }
        }
    };

    MPI_Finalize();
}