使用 gcc 在 linux x86-64 上为多个进程自动递增共享内存中的整数

Atomically incrementing an integer in shared memory for multiple processes on linux x86-64 with gcc

问题 增加计数器并在计数器达到给定值后发出信号的好方法是什么(即发出信号等待块直到满的函数,如下所示)?这很像请求信号量。涉及的进程通过共享内存 (/dev/shm) 进行通信,我目前正试图避免使用库(如 Boost)。

初始解

其他解决方案

以下应该有效。

实施

我包含了大部分代码,但为了简洁起见,我省略了部分代码。

signaling_incrementing_counter.h

#include <atomic>

struct SignalingIncrementingCounter {
public:
    void init(const int upper_limit_);
    void reset_to_empty();
    void increment(); // only valid when counting up
    void block_until_full(const char * comment = {""});
private:
    int upper_limit;
    volatile int value;
    pthread_mutex_t mutex;
    pthread_cond_t cv;

};

signaling_incrementing_counter.cpp

#include <pthread.h>
#include <stdexcept>

#include "signaling_incrementing_counter.h"


void SignalingIncrementingCounter::init(const int upper_limit_) {

    upper_limit = upper_limit_;
    {
        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        int retval = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
        if (retval) {
            throw std::runtime_error("Error while setting sharedp field for mutex");
        }
        pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);

        pthread_mutex_init(&mutex, &attr);
        pthread_mutexattr_destroy(&attr);
    }

    {
        pthread_condattr_t attr;
        pthread_condattr_init(&attr);
        pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);

        pthread_cond_init(&cv, &attr);
        pthread_condattr_destroy(&attr);
    }

    value = 0;
}

void SignalingIncrementingCounter::reset_to_empty() {
    pthread_mutex_lock(&mutex);
    value = 0;
    // No need to signal, because in my use-case, there is no function that unblocks when the value changes to 0
    pthread_mutex_unlock(&mutex);
}

void SignalingIncrementingCounter::increment() {
    pthread_mutex_lock(&mutex);
    fprintf(stderr, "incrementing\n");
    ++value;
    if (value >= upper_limit) {
        pthread_cond_broadcast(&cv);
    }
    pthread_mutex_unlock(&mutex);
}

void SignalingIncrementingCounter::block_until_full(const char * comment) {
    struct timespec max_wait = {0, 0};
    pthread_mutex_lock(&mutex);
    while (value < upper_limit) {
        int val = value;
        printf("blocking until full, value is %i, for %s\n", val, comment);
        clock_gettime(CLOCK_REALTIME, &max_wait);
        max_wait.tv_sec += 5; // wait 5 seconds
        const int timed_wait_rv = pthread_cond_timedwait(&cv, &mutex, &max_wait);
        if (timed_wait_rv)
        {
            switch(timed_wait_rv) {
            case ETIMEDOUT:
                break;
            default:
                throw std::runtime_error("Unexpected error encountered.  Investigate.");
            }
        }
    }
    pthread_mutex_unlock(&mutex);
}

使用 intstd::atomic 均可。

std::atomic 界面的一大优点是它与 int "interface" 配合得很好。所以,代码几乎完全一样。可以通过添加 #define USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER true.

在下面的每个实现之间切换

我不太确定在共享内存中静态创建 std::atomic,所以我使用 placement new 来分配它。我的猜测是依赖静态分配会起作用,但从技术上讲它可能是未定义的行为。弄清楚这一点超出了我的问题范围,但非常欢迎对该主题发表评论。

signaling_incrementing_counter.h

#include <atomic>
#include "gpu_base_constants.h"

struct SignalingIncrementingCounter {
public:
    /**
     * We will either count up or count down to the given limit.  Once the limit is reached, whatever is waiting on this counter will be signaled and allowed to proceed.
     */
    void init(const int upper_limit_);
    void reset_to_empty();
    void increment(); // only valid when counting up
    void block_until_full(const char * comment = {""});
    // We don't have a use-case for the block_until_non_full

private:

    int upper_limit;

#if USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER
    volatile int value;
#else // USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER
    std::atomic<int> value;
    std::atomic<int> * value_ptr;
#endif // USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER

    pthread_mutex_t mutex;
    pthread_cond_t cv;

};

signaling_incrementing_counter.cpp

#include <pthread.h>
#include <stdexcept>

#include "signaling_incrementing_counter.h"


void SignalingIncrementingCounter::init(const int upper_limit_) {

    upper_limit = upper_limit_;
#if !GPU_USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER
    value_ptr = new(&value) std::atomic<int>(0);
#endif // GPU_USE_INT_IN_SHARED_MEMORY_FOR_SIGNALING_COUNTER
    {
        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        int retval = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
        if (retval) {
            throw std::runtime_error("Error while setting sharedp field for mutex");
        }
        pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);

        pthread_mutex_init(&mutex, &attr);
        pthread_mutexattr_destroy(&attr);
    }

    {
        pthread_condattr_t attr;
        pthread_condattr_init(&attr);
        pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);

        pthread_cond_init(&cv, &attr);
        pthread_condattr_destroy(&attr);
    }

    reset_to_empty(); // should be done at end, since mutex functions are called
}

void SignalingIncrementingCounter::reset_to_empty() {
    int mutex_rv = pthread_mutex_lock(&mutex);
    if (mutex_rv) {
      throw std::runtime_error("Unexpected error encountered while grabbing lock.  Investigate.");
    }
    value = 0;
    // No need to signal, because there is no function that unblocks when the value changes to 0
    pthread_mutex_unlock(&mutex);
}

void SignalingIncrementingCounter::increment() {
    fprintf(stderr, "incrementing\n");
    int mutex_rv = pthread_mutex_lock(&mutex);
    if (mutex_rv) {
      throw std::runtime_error("Unexpected error encountered while grabbing lock.  Investigate.");
    }
    ++value;
    fprintf(stderr, "incremented\n");

    if (value >= upper_limit) {
        pthread_cond_broadcast(&cv);
    }
    pthread_mutex_unlock(&mutex);
}

void SignalingIncrementingCounter::block_until_full(const char * comment) {
    struct timespec max_wait = {0, 0};
    int mutex_rv = pthread_mutex_lock(&mutex);
    if (mutex_rv) {
      throw std::runtime_error("Unexpected error encountered while grabbing lock.  Investigate.");
    }
    while (value < upper_limit) {
        int val = value;
        printf("blocking during increment until full, value is %i, for %s\n", val, comment);
        /*const int gettime_rv =*/ clock_gettime(CLOCK_REALTIME, &max_wait);
        max_wait.tv_sec += 5;
        const int timed_wait_rv = pthread_cond_timedwait(&cv, &mutex, &max_wait);
        if (timed_wait_rv)
        {
            switch(timed_wait_rv) {
            case ETIMEDOUT:
                break;
            default:
              pthread_mutex_unlock(&mutex);
                throw std::runtime_error("Unexpected error encountered.  Investigate.");
            }
        }
    }
    pthread_mutex_unlock(&mutex);
}