如何加速 C 互斥体?
How to speed up C mutex?
我有这个错误的代码。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define MAX 1000
struct TContext {
const char* Name;
int* Counter;
int Mod;
};
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (*counter < MAX) {
if (*counter % 2 == ctxt->Mod) {
printf("%d ", (*counter)++);
}
}
pthread_exit(0);
}
int main()
{
pthread_t t1;
pthread_t t2;
int counter = 0;
struct TContext ctxt1 = {"even", &counter, 0};
struct TContext ctxt2 = {"odd", &counter, 1};
pthread_create(&t1, 0, ThreadFunc, &ctxt1);
pthread_create(&t2, 0, ThreadFunc, &ctxt2);
pthread_join(t1, 0);
pthread_join(t2, 0);
printf("\n");
return 0;
}
我的目标是同步它并获得序列 0、1、2、3、4、5...。
我正在尝试以这种方式锁定和解锁互斥量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (*counter < MAX) {
if (*counter % 2 == ctxt->Mod) {
pthread_mutex_lock(&mutex);
printf("%d ", (*counter)++);
pthread_mutex_unlock(&mutex);
}
}
pthread_exit(0);
}
但是速度很慢(我一秒就tl了)
如何以更有效的方式同步此代码?或者也许我可以优化 C-mutex?
我建议:
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
volatile int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (1)
{
int count ;
count = *counter ; // NB: volatile*
if (count >= MAX)
break ;
if ((count % 2) == ctxt->Mod)
{
printf("%d ", count) ;
*counter = count + 1 ;
} ;
} ;
pthread_exit(0);
}
至少对于 x86/x86_64,这将产生我认为您正在寻找的效果,即两个线程轮流递增计数器。
真正有趣的问题是为什么这有效:-)
后记
上面的代码关键取决于四件事:
线程之间只共享一个值 -- 计数器,
计数器同时是数据和控制——计数器的ls位指示哪个线程应该继续。
读取和写入计数器必须是原子的——所以每次读取计数器都会读取最后写入的值(而不是先前和当前写入的某种组合)。
编译器必须发出代码以实际 read/write 计数器 from/to 内存 在 循环中。
现在 (1) 和 (2) 是针对这个特定问题的。 (3) 通常适用于 int
(尽管可能需要正确对齐)。 (4) 通过将计数器定义为volatile
.
来实现
所以,我最初说这会起作用 "for x86/x86_64 at least" 因为我知道 (3) 对于那些设备是正确的,但我也相信它对于许多(大多数?)常见设备都是正确的。
更清晰的实现将定义计数器 _Atomic
,如下所示:
#include <stdatomic.h>
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
atomic_int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (1)
{
int count ;
count = atomic_load_explicit(counter, memory_order_relaxed) ;
if (count > MAX) // printing up to and including MAX
break ;
if ((count % 2) == ctxt->Mod)
{
printf("%d ", count) ;
atomic_store_explicit(counter, count + 1, memory_order_relaxed) ;
} ;
} ;
pthread_exit(0);
}
这使得 (3) 和 (4) 明确。但请注意 (1) 和 (2) 仍然意味着我们不需要任何内存排序。每次每个线程读取计数器时,bit0 都会告诉它是否 "owns" 计数器。如果它不拥有计数器,则线程循环再次读取它。如果它确实拥有计数器,它会使用该值,然后写入一个新值——因为它将 "ownership" 传递给它 returns 到读取循环(它不能对计数器做任何进一步的事情,直到它 "owns" 又一次)。一旦 MAX+1 被写入计数器,任何线程都不会使用或更改它,所以这也是安全的。
兄Employed Russian说的对,这里有一个"data race",不过是通过数据依赖解决的,特别是本例
更普遍
上面的代码不是很有用,除非您有其他具有单个共享值的应用程序。但这可以推广,使用 memory_order_acquire
和 memory_order_acquire
原子操作。
假设我们有一些 struct shared
,其中包含一些(非平凡的)数据量,一个线程将产生这些数据,另一个线程将使用这些数据。假设我们再次使用 atomic_uint counter
(最初为零)来管理对给定 struct shared parcel
的访问。现在我们有一个生产者线程:
void* ThreadProducerFunc(void* arg)
{
atomic_uint counter = &count ; // somehow
....
while (1)
{
uint count ;
do
count = atomic_load_explicit(counter, memory_order_acquire) ;
while ((count & 1) == 1) ;
... fill the struct shared parcel, somehow ...
atomic_store_explicit(counter, count + 1, memory_order_release) ;
} ;
....
}
还有一个消费者线程:
void* ThreadConsumerFunc(void* arg)
{
atomic_uint counter = &count ; // somehow
....
while (1)
{
uint count ;
do
count = atomic_load_explicit(counter, memory_order_acquire) ;
while ((count & 1) == 0) ;
... empty the struct shared parcel, somehow ...
atomic_store_explicit(counter, count + 1, memory_order_release) ;
} ;
....
}
加载获取操作与存储释放操作同步,因此:
在生产者中:直到生产者有"ownership"(如上),包裹的填充才会开始,然后"complete"(写入对其他线程)在 计数更新之前(并且新值对其他线程可见)。
在消费者中:直到消费者有"ownership"(同上)才会开始清空包裹,然后会"complete"(所有读取都会读取来自内存)在 计数更新之前(新值对其他线程可见)。
很明显,两个线程都在忙着等待对方。但是对于两个或更多的包裹和计数器,线程可以以较慢的速度进行。
最后 -- x86/x86_64 和 acquire/release
与x86/x86_64,所有内存读取和写入都是隐式获取读取和释放写入。这意味着 atomic_load_explicit(..., memory_order_acquire)
和 atomic_store_explicit(..., memory_order_release)
中的开销为零。
相反,所有读取-修改-写入操作(和 memory_order_seq_cst 操作)在几十个时钟周期内进行开销——30?、50?,如果操作竞争则更多(取决于设备)。
因此,在性能至关重要的情况下,可能值得了解什么是可能的(以及什么不是)。
比 Chris Halls 稍微传统的方法是:
pthread_cond_t cv;
pthread_mutex_t lock;
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
pthread_mutex_lock(&lock);
while (*counter < MAX) {
if (*counter % 2 == ctxt->Mod) {
printf("%d ", (*counter)++);
pthread_cond_broadcast(&cv);
} else {
pthread_cond_wait(&cv, &lock);
}
}
pthread_mutex_unlock(&lock);
pthread_exit(0);
}
主要是:
pthread_mutex_init(&lock, 0);
pthread_cond_init(&cv, 0);
创建线程之前的某个地方。这还可以让您添加任意数量的偶数 + 奇数线程而不会受到干扰(虽然没有加速,只是求知欲)。
How I can synchronize this code in more effective way?
你不能:代码从根本上效率低下。
问题在于,与同步开销相比,您所做的工作量(递增整数)是微不足道的,因此后者占主导地位。
要解决此问题,您需要为每个 lock/unlock 对做更多的工作。
在实际程序中,您会让每个线程针对每个 lock/unlock 迭代执行 1000 或 10000 "work items"。类似于:
lock;
const int start = *ctx->Counter;
*ctx->Counter += N;
unlock;
for (int j = start; j < start + N; j++) /* do work on j-th iteration here */;
但是你的玩具程序不适合这个。
Or maybe I can optimize C-mutex?
我建议首先尝试实现 正确的 互斥锁。您很快就会发现这绝非微不足道。
我有这个错误的代码。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define MAX 1000
struct TContext {
const char* Name;
int* Counter;
int Mod;
};
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (*counter < MAX) {
if (*counter % 2 == ctxt->Mod) {
printf("%d ", (*counter)++);
}
}
pthread_exit(0);
}
int main()
{
pthread_t t1;
pthread_t t2;
int counter = 0;
struct TContext ctxt1 = {"even", &counter, 0};
struct TContext ctxt2 = {"odd", &counter, 1};
pthread_create(&t1, 0, ThreadFunc, &ctxt1);
pthread_create(&t2, 0, ThreadFunc, &ctxt2);
pthread_join(t1, 0);
pthread_join(t2, 0);
printf("\n");
return 0;
}
我的目标是同步它并获得序列 0、1、2、3、4、5...。
我正在尝试以这种方式锁定和解锁互斥量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (*counter < MAX) {
if (*counter % 2 == ctxt->Mod) {
pthread_mutex_lock(&mutex);
printf("%d ", (*counter)++);
pthread_mutex_unlock(&mutex);
}
}
pthread_exit(0);
}
但是速度很慢(我一秒就tl了)
如何以更有效的方式同步此代码?或者也许我可以优化 C-mutex?
我建议:
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
volatile int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (1)
{
int count ;
count = *counter ; // NB: volatile*
if (count >= MAX)
break ;
if ((count % 2) == ctxt->Mod)
{
printf("%d ", count) ;
*counter = count + 1 ;
} ;
} ;
pthread_exit(0);
}
至少对于 x86/x86_64,这将产生我认为您正在寻找的效果,即两个线程轮流递增计数器。
真正有趣的问题是为什么这有效:-)
后记
上面的代码关键取决于四件事:
线程之间只共享一个值 -- 计数器,
计数器同时是数据和控制——计数器的ls位指示哪个线程应该继续。
读取和写入计数器必须是原子的——所以每次读取计数器都会读取最后写入的值(而不是先前和当前写入的某种组合)。
编译器必须发出代码以实际 read/write 计数器 from/to 内存 在 循环中。
现在 (1) 和 (2) 是针对这个特定问题的。 (3) 通常适用于 int
(尽管可能需要正确对齐)。 (4) 通过将计数器定义为volatile
.
所以,我最初说这会起作用 "for x86/x86_64 at least" 因为我知道 (3) 对于那些设备是正确的,但我也相信它对于许多(大多数?)常见设备都是正确的。
更清晰的实现将定义计数器 _Atomic
,如下所示:
#include <stdatomic.h>
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
atomic_int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (1)
{
int count ;
count = atomic_load_explicit(counter, memory_order_relaxed) ;
if (count > MAX) // printing up to and including MAX
break ;
if ((count % 2) == ctxt->Mod)
{
printf("%d ", count) ;
atomic_store_explicit(counter, count + 1, memory_order_relaxed) ;
} ;
} ;
pthread_exit(0);
}
这使得 (3) 和 (4) 明确。但请注意 (1) 和 (2) 仍然意味着我们不需要任何内存排序。每次每个线程读取计数器时,bit0 都会告诉它是否 "owns" 计数器。如果它不拥有计数器,则线程循环再次读取它。如果它确实拥有计数器,它会使用该值,然后写入一个新值——因为它将 "ownership" 传递给它 returns 到读取循环(它不能对计数器做任何进一步的事情,直到它 "owns" 又一次)。一旦 MAX+1 被写入计数器,任何线程都不会使用或更改它,所以这也是安全的。
兄Employed Russian说的对,这里有一个"data race",不过是通过数据依赖解决的,特别是本例
更普遍
上面的代码不是很有用,除非您有其他具有单个共享值的应用程序。但这可以推广,使用 memory_order_acquire
和 memory_order_acquire
原子操作。
假设我们有一些 struct shared
,其中包含一些(非平凡的)数据量,一个线程将产生这些数据,另一个线程将使用这些数据。假设我们再次使用 atomic_uint counter
(最初为零)来管理对给定 struct shared parcel
的访问。现在我们有一个生产者线程:
void* ThreadProducerFunc(void* arg)
{
atomic_uint counter = &count ; // somehow
....
while (1)
{
uint count ;
do
count = atomic_load_explicit(counter, memory_order_acquire) ;
while ((count & 1) == 1) ;
... fill the struct shared parcel, somehow ...
atomic_store_explicit(counter, count + 1, memory_order_release) ;
} ;
....
}
还有一个消费者线程:
void* ThreadConsumerFunc(void* arg)
{
atomic_uint counter = &count ; // somehow
....
while (1)
{
uint count ;
do
count = atomic_load_explicit(counter, memory_order_acquire) ;
while ((count & 1) == 0) ;
... empty the struct shared parcel, somehow ...
atomic_store_explicit(counter, count + 1, memory_order_release) ;
} ;
....
}
加载获取操作与存储释放操作同步,因此:
在生产者中:直到生产者有"ownership"(如上),包裹的填充才会开始,然后"complete"(写入对其他线程)在 计数更新之前(并且新值对其他线程可见)。
在消费者中:直到消费者有"ownership"(同上)才会开始清空包裹,然后会"complete"(所有读取都会读取来自内存)在 计数更新之前(新值对其他线程可见)。
很明显,两个线程都在忙着等待对方。但是对于两个或更多的包裹和计数器,线程可以以较慢的速度进行。
最后 -- x86/x86_64 和 acquire/release
与x86/x86_64,所有内存读取和写入都是隐式获取读取和释放写入。这意味着 atomic_load_explicit(..., memory_order_acquire)
和 atomic_store_explicit(..., memory_order_release)
中的开销为零。
相反,所有读取-修改-写入操作(和 memory_order_seq_cst 操作)在几十个时钟周期内进行开销——30?、50?,如果操作竞争则更多(取决于设备)。
因此,在性能至关重要的情况下,可能值得了解什么是可能的(以及什么不是)。
比 Chris Halls 稍微传统的方法是:
pthread_cond_t cv;
pthread_mutex_t lock;
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
pthread_mutex_lock(&lock);
while (*counter < MAX) {
if (*counter % 2 == ctxt->Mod) {
printf("%d ", (*counter)++);
pthread_cond_broadcast(&cv);
} else {
pthread_cond_wait(&cv, &lock);
}
}
pthread_mutex_unlock(&lock);
pthread_exit(0);
}
主要是:
pthread_mutex_init(&lock, 0);
pthread_cond_init(&cv, 0);
创建线程之前的某个地方。这还可以让您添加任意数量的偶数 + 奇数线程而不会受到干扰(虽然没有加速,只是求知欲)。
How I can synchronize this code in more effective way?
你不能:代码从根本上效率低下。
问题在于,与同步开销相比,您所做的工作量(递增整数)是微不足道的,因此后者占主导地位。
要解决此问题,您需要为每个 lock/unlock 对做更多的工作。
在实际程序中,您会让每个线程针对每个 lock/unlock 迭代执行 1000 或 10000 "work items"。类似于:
lock;
const int start = *ctx->Counter;
*ctx->Counter += N;
unlock;
for (int j = start; j < start + N; j++) /* do work on j-th iteration here */;
但是你的玩具程序不适合这个。
Or maybe I can optimize C-mutex?
我建议首先尝试实现 正确的 互斥锁。您很快就会发现这绝非微不足道。