如何加速 C 互斥体?

How to speed up C mutex?

我有这个错误的代码。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define MAX 1000

struct TContext {
    const char* Name;
    int* Counter;
    int Mod;
};

void* ThreadFunc(void* arg) {
    struct TContext* ctxt = arg;
    int* counter = ctxt->Counter;
    fprintf(stderr, "This is %s thread\n", ctxt->Name);
    while (*counter < MAX) {
        if (*counter % 2 == ctxt->Mod) {
            printf("%d ", (*counter)++);
        }
    }
    pthread_exit(0);
}

int main()
{
    pthread_t t1;
    pthread_t t2;

    int counter = 0;
    struct TContext ctxt1 = {"even", &counter, 0};
    struct TContext ctxt2 = {"odd", &counter, 1};
    pthread_create(&t1, 0, ThreadFunc, &ctxt1);
    pthread_create(&t2, 0, ThreadFunc, &ctxt2);

    pthread_join(t1, 0);
    pthread_join(t2, 0);
    printf("\n");
    return 0;
}

我的目标是同步它并获得序列 0、1、2、3、4、5...。

我正在尝试以这种方式锁定和解锁互斥量

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* ThreadFunc(void* arg) {
  struct TContext* ctxt = arg;
  int* counter = ctxt->Counter;
  fprintf(stderr, "This is %s thread\n", ctxt->Name);
  while (*counter < MAX) {
    if (*counter % 2 == ctxt->Mod) {
      pthread_mutex_lock(&mutex);
      printf("%d ", (*counter)++);
      pthread_mutex_unlock(&mutex);
    }
  }
  pthread_exit(0);
}

但是速度很慢(我一秒就tl了)

如何以更有效的方式同步此代码?或者也许我可以优化 C-mutex?

我建议:

void* ThreadFunc(void* arg) {
    struct TContext* ctxt = arg;
    volatile int* counter = ctxt->Counter;
    fprintf(stderr, "This is %s thread\n", ctxt->Name);

    while (1)
    {
        int count ;

        count = *counter ;     // NB: volatile*

        if (count >= MAX)
          break ;

         if ((count % 2) == ctxt->Mod)
         {
             printf("%d ", count) ;
             *counter = count + 1 ;
         } ;
    } ;
    pthread_exit(0);
}

至少对于 x86/x86_64,这将产生我认为您正在寻找的效果,即两个线程轮流递增计数器。

真正有趣的问题是为什么这有效:-)


后记

上面的代码关键取决于四件事:

  1. 线程之间只共享一个值 -- 计数器,

  2. 计数器同时是数据控制——计数器的ls位指示哪个线程应该继续。

  3. 读取和写入计数器必须是原子的——所以每次读取计数器都会读取最后写入的值(而不是先前和当前写入的某种组合)。

  4. 编译器必须发出代码以实际 read/write 计数器 from/to 内存 循环中。

现在 (1) 和 (2) 是针对这个特定问题的。 (3) 通常适用于 int(尽管可能需要正确对齐)。 (4) 通过将计数器定义为volatile.

来实现

所以,我最初说这会起作用 "for x86/x86_64 at least" 因为我知道 (3) 对于那些设备是正确的,但我也相信它对于许多(大多数?)常见设备都是正确的。

更清晰的实现将定义计数器 _Atomic,如下所示:

#include <stdatomic.h>

void* ThreadFunc(void* arg) {
    struct TContext* ctxt = arg;
    atomic_int* counter = ctxt->Counter;
    fprintf(stderr, "This is %s thread\n", ctxt->Name);

    while (1)
    {
        int count ;

        count = atomic_load_explicit(counter, memory_order_relaxed) ;

        if (count > MAX)        // printing up to and including MAX
          break ;

         if ((count % 2) == ctxt->Mod)
         {
             printf("%d ", count) ;
             atomic_store_explicit(counter, count + 1, memory_order_relaxed) ;
         } ;
    } ;
    pthread_exit(0);
}

这使得 (3) 和 (4) 明确。但请注意 (1) 和 (2) 仍然意味着我们不需要任何内存排序。每次每个线程读取计数器时,bit0 都会告诉它是否 "owns" 计数器。如果它不拥有计数器,则线程循环再次读取它。如果它确实拥有计数器,它会使用该值,然后写入一个新值——因为它将 "ownership" 传递给它 returns 到读取循环(它不能对计数器做任何进一步的事情,直到它 "owns" 又一次)。一旦 MAX+1 被写入计数器,任何线程都不会使用或更改它,所以这也是安全的。

Employed Russian说的对,这里有一个"data race",不过是通过数据依赖解决的,特别是本例


更普遍

上面的代码不是很有用,除非您有其他具有单个共享值的应用程序。但这可以推广,使用 memory_order_acquirememory_order_acquire 原子操作。

假设我们有一些 struct shared,其中包含一些(非平凡的)数据量,一个线程将产生这些数据,另一个线程将使用这些数据。假设我们再次使用 atomic_uint counter(最初为零)来管理对给定 struct shared parcel 的访问。现在我们有一个生产者线程:

void* ThreadProducerFunc(void* arg) 
{
    atomic_uint counter = &count ;     // somehow
    ....
    while (1)
    {
        uint count ;

        do
          count = atomic_load_explicit(counter, memory_order_acquire) ;
        while ((count & 1) == 1) ;

        ... fill the struct shared parcel, somehow ...

        atomic_store_explicit(counter, count + 1, memory_order_release) ;
    } ;
    ....
}

还有一个消费者线程:

void* ThreadConsumerFunc(void* arg) 
{
    atomic_uint counter = &count ;     // somehow
    ....
    while (1)
    {
        uint count ;

        do
          count = atomic_load_explicit(counter, memory_order_acquire) ;
        while ((count & 1) == 0) ;

        ... empty the struct shared parcel, somehow ...

        atomic_store_explicit(counter, count + 1, memory_order_release) ;
    } ;
    ....
}

加载获取操作与存储释放操作同步,因此:

  • 在生产者中:直到生产者有"ownership"(如上),包裹的填充才会开始,然后"complete"(写入对其他线程) 计数更新之前(并且新值对其他线程可见)。

  • 在消费者中:直到消费者有"ownership"(同上)才会开始清空包裹,然后会"complete"(所有读取都会读取来自内存) 计数更新之前(新值对其他线程可见)。

很明显,两个线程都在忙着等待对方。但是对于两个或更多的包裹和计数器,线程可以以较慢的速度进行。


最后 -- x86/x86_64 和 acquire/release

与x86/x86_64,所有内存读取和写入都是隐式获取读取和释放写入。这意味着 atomic_load_explicit(..., memory_order_acquire)atomic_store_explicit(..., memory_order_release) 中的开销为零。

相反,所有读取-修改-写入操作(和 memory_order_seq_cst 操作)在几十个时钟周期内进行开销——30?、50?,如果操作竞争则更多(取决于设备)。

因此,在性能至关重要的情况下,可能值得了解什么是可能的(以及什么不是)。

Chris Halls 稍微传统的方法是:

pthread_cond_t  cv;
pthread_mutex_t lock;

void* ThreadFunc(void* arg) {
    struct TContext* ctxt = arg;
    int* counter = ctxt->Counter;
    fprintf(stderr, "This is %s thread\n", ctxt->Name);
    pthread_mutex_lock(&lock);
    while (*counter < MAX) {
        if (*counter % 2 == ctxt->Mod) {
            printf("%d ", (*counter)++);
            pthread_cond_broadcast(&cv);
        } else {
            pthread_cond_wait(&cv, &lock);
        }
    }
    pthread_mutex_unlock(&lock);
    pthread_exit(0);
}

主要是:

pthread_mutex_init(&lock, 0);
pthread_cond_init(&cv, 0);

创建线程之前的某个地方。这还可以让您添加任意数量的偶​​数 + 奇数线程而不会受到干扰(虽然没有加速,只是求知欲)。

How I can synchronize this code in more effective way?

你不能:代码从根本上效率低下。

问题在于,与同步开销相比,您所做的工作量(递增整数)是微不足道的,因此后者占主导地位。

要解决此问题,您需要为每个 lock/unlock 对做更多的工作。

在实际程序中,您会让每个线程针对每个 lock/unlock 迭代执行 1000 或 10000 "work items"。类似于:

lock;
const int start = *ctx->Counter;
*ctx->Counter += N;
unlock;

for (int j = start; j < start + N; j++) /* do work on j-th iteration here */;

但是你的玩具程序不适合这个。

Or maybe I can optimize C-mutex?

我建议首先尝试实现 正确的 互斥锁。您很快就会发现这绝非微不足道。