C++11 无锁序列号生成器安全吗?

C++11 Lock-free sequence number generator safe?

目标是在现代 C++ 中实现序列号生成器。上下文处于并发环境中。

要求 #1 class 必须是单例(所有线程通用)

要求#2数字使用的类型是 64 位整数。

要求#3来电可以申请多个号码

要求 #4 此 class 将在能够为呼叫提供服务之前缓存一系列数字。因为它缓存了一个序列,所以它还必须存储上界->最大数量才能return。

要求 #5 最后但并非最不重要的一点是,在启动时(构造函数)和没有可用数字时(n_requested > n_avalaible) ,单例 class 必须查询数据库以获取新序列。此从数据库加载,同时更新 seq_n_ 和 max_seq_n_.

其界面的简要草稿如下:

class singleton_sequence_manager {

public:

    static singleton_sequence_manager& instance() {
        static singleton_sequence_manager s;
        return s;
    }

    std::vector<int64_t> get_sequence(int64_t n_requested);

private:
    singleton_sequence_manager(); //Constructor
    void get_new_db_sequence(); //Gets a new sequence from DB

    int64_t seq_n_;
    int64_t max_seq_n_;
}

示例只是为了阐明用例。 假设在启动时,DB将seq_n_设置为1000,将max_seq_n_设置为1050:

get_sequence.(20); //Gets [1000, 1019]
get_sequence.(20); //Gets [1020, 1039]
get_sequence.(5); //Gets [1040, 1044]
get_sequence.(10); //In order to serve this call, a new sequence must be load from DB

显然,使用锁和 std::mutex 的实现非常简单。

我感兴趣的是使用 std::atomic 和原子操作实现无锁版本。

我的第一次尝试是:

int64_t seq_n_;
int64_t max_seq_n_;

已更改为:

std::atomic<int64_t> seq_n_;
std::atomic<int64_t> max_seq_n_;

从 DB 中获取新序列只是在原子变量中设置新值:

void singleton_sequence_manager::get_new_db_sequence() {
    //Sync call is made to DB
    //Let's just ignore unhappy paths for simplicity
    seq_n_.store( start_of_seq_got_from_db );
    max_seq_n_.store( end_of_seq_got_from_db );
    //At this point, the class can start returning numbers in [seq_n_ : max_seq_n_]
}

现在 get_sequence 函数使用原子比较和交换技术:

std::vector<int64_t> singleton_sequence_manager::get_sequence(int64_t n_requested) {

    bool succeeded{false};
    int64_t current_seq{};
    int64_t next_seq{};

    do {

        current_seq = seq_n_.load();
        do {
            next_seq = current_seq + n_requested + 1;
        }
        while( !seq_n_.compare_exchange_weak( current_seq, next_seq ) );
        //After the CAS, the caller gets the sequence [current_seq:next_seq-1]

        //Check if sequence is in the cached bound.
        if( max_seq_n_.load() > next_seq - 1 )
            succeeded = true;
        else //Needs to load new sequence from DB, and re-calculate again
            get_new_db_sequence();

    }        
    while( !succeeded );

    //Building the response        
    std::vector<int64_t> res{};
    res.resize(n_requested);
    for(int64_t n = current_seq ; n < next_seq ; n++)
        res.push_back(n);

    return res;
}

想法:

您的无锁版本有一个根本性缺陷,因为它将两个独立的原子变量视为一个实体。由于写入 seq_n_max_seq_n_ 是单独的语句,因此它们可以在执行期间分开,导致其中一个与另一个配对时使用的值不正确。

例如,一个线程可以通过 CAS 内部 while 循环(n_requested 对于当前缓存的序列来说太大),然后在检查它是否被缓存之前被挂起。第二个线程可以通过并将 max_seq_n 值更新为更大的值。然后第一个线程恢复,并通过 max_seq_n 检查,因为该值已由第二个线程更新。它现在使用的序列无效。

在两次 store 调用之间的 get_new_db_sequence 中可能会发生类似的事情。

由于您正在写入两个不同的位置(即使在内存中相邻),并且它们不能以原子方式更新(由于 128 位的组合大小不是您的编译器支持的原子大小),写入必须由互斥体保护。

自旋锁只能用于非常短的等待,因为它会消耗 CPU 个周期。典型的用法是使用短自旋锁,如果资源仍然不可用,则使用更昂贵的东西(如互斥锁)等待 CPU 时间。