std::atomic vs 线程同步的静态变量

std::atomic vs static variable for thread synchronization

我有一堆线程在处理多个数据项。线程必须按照我将数据交给线程的相同顺序输出结果。即:

Thread #1: give data - start processing
Thread #2: give data - start processing
Thread #3: give data - start processing
...
Thread #n: give data - start processing

结果的检索顺序应与数据传递给线程的顺序相同,而不管哪个线程先完成处理。即:

Thread #1: put data 
Thread #2: put data
...

为了区分线程并管理它们,我给每个线程一个 ID (0,1,2,...,n)。我正在使用 ID 将数据分配给每个线程,以便它可以处理它。

for(int i=0; i<thread_count; i++)
    give_data(i); // i is id and the function knows where to get data from

我希望线程共享一个令牌,该令牌确定预期哪个线程会产生结果。所有线程主体都是相同的,主体看起来像:

while(true){
    auto data = get_data();
    result = process_data(data);
    while(token != this_id) spin;
    put_data(result); // this is a synchronized call 
    update_token(token);
}

我的问题来自 token。我首先尝试了一个普通的引用(int & token),它显然不能工作(我没想到它会)。不管怎样,我使用了一个静态变量,线程并不总是得到最新的。我很惊讶地看到一个线程主宰一切。每当一个线程更新令牌时,它就会失去轮到允许另一个线程放置它的结果等等。但是,我有一个线程占主导地位,就好像令牌总是设置为它自己的 ID 而不是更新。

如果非要我猜的话,我会说这是一个缓存问题。但是,我不确定。

无论如何,我正在考虑使用 std::atomic<int> 作为我的令牌。行得通吗?如果没有,我还应该考虑做什么?同步这些线程的更好方法是什么?

额外:这感觉像是一个糟糕的设计,我不确定如何做得更好。任何建议将不胜感激。

Anyway, I used a static variable and the threads do not always get the latest one. I was surprised to see one thread dominating everything

是的,多个线程访问同一个非同步值并且至少有一个线程写入它是一种数据竞争,根据 C++ 标准,这是未定义的行为。任何事情都有可能发生。

I am thinking of using std::atomic as my token. Would it work?

是的。这将防止令牌上的任何数据竞争。我在你的伪代码中没有看到任何其他直接问题,所以从这个角度来看它看起来不错。

this feels like a bad design and I am not sure how to do it better. Any suggestions would be very much appreciated.

整个设计看起来确实有些怪异,但如果有更简单的表达方式,这取决于您的线程库。例如,使用 OpenMP,您可以一次性执行此操作(give_dataget_data 背后的逻辑太不清楚,无法完整说明):

#pragma omp parallel
{
    int threadCount = omp_get_num_threads();
#pragma omp single
    for (int i = 0; i < threadCount; ++i)
        give_data(i);

#pragma omp ordered for ordered schedule(static)
    for (int i = 0; i < threadCount; ++i)
    {
        auto data = get_data();
        result = process_data(data);
#pragma omp ordered
        put_data(result); // this is a synchronized call 
    }
}

ordered 指令强制 put_data 调用以完全相同的顺序(一个接一个)执行,就好像循环是串行的一样,而线程仍然可以执行先前的操作并行数据处理。

如果您真正想要做的只是让一个大循环的数据处理与有序写入并行,那么使用 OpenMP 事情实际上可能会更容易:

#pragma omp parallel for ordered schedule(static)
for (int i = 0; i < dataItemCount; ++i)
{
    auto data = get_data(i); // whatever this would entail
    auto result = process_data(data);
#pragma omp ordered
    put_data(result); // this is a synchronized call    
}

看起来您并不需要按顺序分布数据项,但如果您真的这样做了,那么这种方法就不会那么容易工作,因为每个有序循环只能有一个有序部分。

Max 的回答很棒。如果我有能力在给定的时间内使用 OpenMP,我会这样做的。但是,我不是,这就是为什么我发布这个问题的答案。

在我以前的设计中,它依赖于线程彼此同步,这似乎不是最好的主意,因为很多地方都可能出错。相反,我决定让经理同步他们的结果(我的想法来自 Max 的最后一个代码片段)。

void give_threads_data(){
    vector<pair<data, promise<result>*> promises(threads.size());
    vector<future<result>> futures(threads.size());
    for(int i=0; i<threads.size(); i++){
        data d = get_data();
        threads[i].put_data(d, promises[i]);
        futures[i] = promises[i].get_future();
    }

    for(int i=0; i<futures.size(); i++){
        result = futures[i].get();
        // handle result
    }
}

这样我就可以像将结果发送到线程一样获得结果。线程主体变得更干净了:

void thread_body(){
    while(true){
        pair<data, promise<result>*> item = queue.get(); // blocking call
        data d = item.first;
        promise<result>* promise = item.second;

        result r = process_data(d);
        promise->set_value(r);
    }
}

没有戏,结果很完美。下次我做线程时,我会考虑 OpenMP。