互斥条件信号循环如何工作?
How does mutex condition signaling loop works?
我会做一个假设场景,只是为了清楚我需要知道什么。
假设我经常更新一个文件。
我需要通过几个不同的线程来读取和解析这个文件。
每次重写此文件时,我都会唤醒一个条件互斥体,以便其他线程可以做任何他们想做的事情。
我的问题是:
如果我有10000个线程,第一个线程执行会阻塞其他9999个的执行吗?
它是并行工作还是同步工作?
此 post 自第一次 post 编辑以来已被编辑以解决 Jonathan Wakely 在下面的评论,并更好地区分 condition_variable、条件(均称为条件在第一个版本中),以及 wait 函数是如何运行的。然而,同样重要的是对现代 C++ using std::future
、std::thread
和 std::packaged_task
中更好方法的探索,以及关于缓冲和合理线程数的一些讨论。
首先,10,000 个线程是很多线程。除了最高性能的计算机之外,线程调度程序将承受很大的负担。 Windows 下的典型四核工作站会很吃力。这表明某种排队的任务调度是有序的,典型的服务器使用大约 10 个线程接受数千个连接,每个线程为 1,000 个连接提供服务。线程的数量对问题来说确实不重要,但是在这样的任务量中,10,000 个线程是不切实际的。
为了处理同步,互斥锁本身实际上并没有按照您的建议进行。您描述的概念是一种事件对象,可能是一个自动重置事件,它本身就是一个更高级别的概念。 Windows 将它们作为其 API 的一部分,但它们是在 Linux 上构建的(通常用于便携式软件),具有两个原始组件,一个互斥锁和一个条件变量。这些一起创建了自动重置事件,以及其他类型的 "waitable events" 作为 Windows 调用它们。在 C++ 中,这些由 std::mutex
和 std::condition_variable
.
提供
互斥体本身仅提供对公共资源的锁定控制。在那种情况下,我们不是从客户端和服务器(或工作人员和执行人员)的角度来考虑,而是从对等方之间对单一资源的竞争来考虑,该单一资源只能由一个参与者(线程)在同一时间访问时间。互斥量可以阻止执行,但不会根据外部信号释放。如果另一个线程锁定了互斥量,互斥量就会阻塞,并无限期地等待,直到锁的所有者释放它。这不是您在问题中提出的情况。
在您的场景中,有许多 "clients" 和一个 "server" 线程。服务器负责发出信号,表明某些内容已准备好进行处理。所有其他线程都是此设计中的客户端(线程本身并没有使它们成为客户端,我们只是通过它们执行的功能来认为它们是客户端)。在一些讨论中,客户端被称为工作线程。
客户端使用mutex/condition变量对等待信号。这种构造通常采用锁定互斥锁的形式,然后使用该互斥锁等待条件变量。当线程在条件变量上输入 wait
时,互斥体就会解锁。对于等待工作完成的所有客户端线程重复此操作。典型的客户端等待示例是:
std::mutex m;
std::condition_variable cv;
void client_thread()
{
// Wait until server signals data is ready
std::unique_lock<std::mutex> lk(m); // lock the mutex
cv.wait(lk); // wait on cv
// do the work
}
这是显示一起使用的 mutex/conditional 变量的伪代码。 std::condition_variable
wait 函数有两个重载,这是最简单的一个。目的是线程将阻塞,进入空闲状态,直到 condition_variable 发出信号。它不是一个完整的例子,只是为了指出这两个对象是一起使用的。
Johnathan Wakely 的以下评论是基于 wait
不是不确定的这一事实;不能保证呼叫畅通的原因是因为信号。文档称此为 "spurious wakeup",由于 OS 调度的复杂原因偶尔会发生。 Johnathan 提出的要点是,即使唤醒不是因为 condition_variable 收到信号,使用这一对的代码也必须安全运行。
在使用条件变量的说法中,这称为条件(不是 condition_variable)。条件是一个应用程序定义的概念,在文献中通常被说明为布尔值,并且通常是检查布尔值、整数(有时是原子类型)或调用函数 returning 布尔值的结果。有时应用程序定义的关于什么构成真实条件的概念更复杂,但条件的整体效果是确定线程一旦被唤醒,是否应该继续处理,或者应该简单地重复等待。
满足此要求的一种方法是 std::condition_variable::wait 的第二个版本。两者声明:
void wait( std::unique_lock<std::mutex>& lock );
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
Johnathan 的观点是坚持使用第二个版本。但是,文档描述(并且有两个重载的事实表明)谓词是可选的。 Predicate 是某种函子,通常是 lambda 表达式,如果等待应该解除阻塞则解析为 true,如果等待应该继续等待则解析为 false,并且在锁定下进行评估。 Predicate 是条件的同义词,因为 Predicate 是一种指示 wait 是否应该解除阻塞的真或假的方式。
虽然 Predicate 实际上是可选的,但是 'wait' 在接收到信号之前在阻塞方面并不完美的概念要求如果使用第一个版本,那是因为应用程序是这样构建的虚假唤醒没有任何后果(实际上,是设计的一部分)。
Jonathan 的引文表明 Predicate 是在锁定的情况下求值的,但是以通常不可行的范例的广义形式求值。 std::condition_variable 必须等待锁定的 std::mutex,这可能会保护定义条件的变量,但有时这是不可能的。有时条件更复杂、更外部或更微不足道,以至于 std::mutex 与条件无关。
要了解它在提议的解决方案的上下文中是如何工作的,假设有 10 个客户端线程在等待服务器发出工作即将完成的信号,并且该工作被安排在队列中作为虚拟容器的容器。函子。虚函子可能是这样的:
struct VFunc
{
virtual void operator()(){}
};
template <typename T>
struct VFunctor
{
// Something referring to T, possible std::function
virtual void operator()(){...call the std::function...}
};
typedef std::deque< VFunc > Queue;
上面的伪代码建议使用一个典型的仿函数,它带有一个虚运算符 (),return 为 void 且不采用任何参数,有时称为 "blind call"。建议的关键点是 Queue 可以在不知道调用什么的情况下拥有这些集合,并且 Queue 中的任何 VFunctors 都可以引用任何 std::function 可能能够调用的东西,其中包括其他成员函数对象、lambda、简单函数等。但是,如果只有一个函数签名被调用,也许:
typedef std::deque< std::function<void(void)>> Queue
足够了。
对于任何一种情况,只有当队列中有条目时才会完成工作。
等待,可以使用 class 像:
class AutoResetEvent
{
private:
std::mutex m;
std::condition_variable cv;
bool signalled;
bool signalled_all;
unsigned int wcount;
public:
AutoResetEvent() : wcount( 0 ), signalled(false), signalled_all(false) {}
void SignalAll() { std::unique_lock<std::mutex> l(m);
signalled = true;
signalled_all = true;
cv.notify_all();
}
void SignalOne() { std::unique_lock<std::mutex> l(m);
signalled = true;
cv.notify_one();
}
void Wait() { std::unique_lock<std::mutex> l(m);
++wcount;
while( !signalled )
{
cv.wait(l);
}
--wcount;
if ( signalled_all )
{ if ( wcount == 0 )
{ signalled = false;
signalled_all = false;
}
}
else { signalled = false;
}
}
};
这是可等待对象的标准重置事件类型的伪代码,兼容Windows CreateEvent
和WaitForSingleObject
API,功能基本相同。
所有客户端线程结束于 cv.wait(这可以在 Windows 中超时,使用 Windows API,但不使用 std::condition_variable
).在某些时候,服务器通过调用 Signalxxx 来发出事件信号。您的情况表明 SignalAll()
.
如果调用notify_one,一个等待线程被释放,所有其他线程保持睡眠状态。 notify_all 被调用,然后所有等待该条件的线程被释放去做工作。
以下可能是使用 AutoResetEvent 的示例:
AutoResetEvent evt; // probably not a global
void client()
{
while( !Shutdown ) // assuming some bool to indicate shutdown
{
if ( IsWorkPending() ) DoWork();
evt.Wait();
}
}
void server()
{
// gather data
evt.SignalAll();
}
IsWorkPending()
的使用满足条件的概念,正如 Jonathan Wakely 指出的那样。在指示关闭之前,此循环将处理挂起的工作,否则等待信号。虚假唤醒没有负面影响。 IsWorkPending()
会检查 Queue.size()
,可能是通过一个用 std::mutex 或其他同步机制保护队列的对象。如果工作待处理,DoWork()
将按顺序从队列中弹出条目,直到队列为空。在 return 之后,循环将再次等待信号。
综上所述,mutex 和 condition_variable 的组合与旧的思维方式有关,在 C++11/C++14 时代已经过时了。除非您在使用兼容编译器时遇到问题,否则最好研究 std::promise、std::future 以及 std::async 或 std::thread 与 std::packaged_task 的使用。例如,使用 future、promise、packaged_task 和 thread 可以完全替代上面的讨论。
例如:
// a function for threads to execute
int func()
{
// do some work, return status as result
return result;
}
假设 func 在文件上完成您需要的工作,这些类型定义适用:
typedef std::packaged_task< int() > func_task;
typedef std::future< int > f_int;
typedef std::shared_ptr< f_int > f_int_ptr;
typedef std::vector< f_int_ptr > f_int_vec;
std::future无法复制,所以使用shared_ptr存储,方便在向量中使用,但有多种解决方案。
接下来,一个将这些用于 10 个工作线程的示例
void executive_function()
{
// a vector of future pointers
f_int_vec future_list;
// start some threads
for( int n=0; n < 10; ++n )
{
// a packaged_task calling func
func_task ft( &func );
// get a future from the task as a shared_ptr
f_int_ptr future_ptr( new f_int( ft.get_future() ) );
// store the task for later use
future_list.push_back( future_ptr );
// launch a thread to call task
std::thread( std::move( ft )).detach();
}
// at this point, 10 threads are running
for( auto &d : future_list )
{
// for each future pointer, wait (block if required)
// for each thread's func to return
d->wait();
// get the result of the func return value
int res = d->get();
}
}
这里的重点确实在最后一个range-for循环中。向量存储 packaged_task 提供的期货。这些任务用于启动线程,未来是同步执行的关键。一旦所有线程都为 运行,每个线程都为 "waited on",只需简单调用 future 的 wait 函数,之后即可获得 func 的 return 值。不涉及互斥锁或 condition_variables(据我们所知)。
这让我想到了并行处理文件的主题,无论您如何启动多个线程。如果有一台机器可以处理 10,000 个线程,那么如果每个线程都是一个微不足道的面向文件的操作,就会有相当多的 RAM 资源专门用于文件处理,所有这些都相互重复。根据选择的 API,每个读取操作都有关联的缓冲区。
假设文件为 10 MB,并且有 10,000 个线程开始对其进行操作,其中每个线程使用 4 KB 缓冲区进行处理。结合起来,这表明将有 40 MB 的缓冲区来处理 10 MB 的文件。简单地将文件读入 RAM 并提供对 RAM 中所有线程的只读访问会减少浪费。
由于多个任务在不同时间从文件的不同部分读取可能会导致标准硬盘的严重抖动(闪存源不是这样),如果磁盘缓存不能,则这个概念变得更加复杂赶上。不过,更重要的是,10,000 个线程都在调用系统 API 来读取文件,每个线程都有相当大的开销。
如果源 material 是完全读入 RAM 的候选者,则线程可以专注于 RAM 而不是文件,从而减轻开销,提高性能。线程可以在没有锁的情况下共享对内容的读取访问。
如果源文件太大而无法完全读入 RAM,最好还是分块读取源文件,让线程从共享内存资源中处理该部分,然后移动到下一个块系列。
我会做一个假设场景,只是为了清楚我需要知道什么。
假设我经常更新一个文件。
我需要通过几个不同的线程来读取和解析这个文件。
每次重写此文件时,我都会唤醒一个条件互斥体,以便其他线程可以做任何他们想做的事情。
我的问题是:
如果我有10000个线程,第一个线程执行会阻塞其他9999个的执行吗?
它是并行工作还是同步工作?
此 post 自第一次 post 编辑以来已被编辑以解决 Jonathan Wakely 在下面的评论,并更好地区分 condition_variable、条件(均称为条件在第一个版本中),以及 wait 函数是如何运行的。然而,同样重要的是对现代 C++ using std::future
、std::thread
和 std::packaged_task
中更好方法的探索,以及关于缓冲和合理线程数的一些讨论。
首先,10,000 个线程是很多线程。除了最高性能的计算机之外,线程调度程序将承受很大的负担。 Windows 下的典型四核工作站会很吃力。这表明某种排队的任务调度是有序的,典型的服务器使用大约 10 个线程接受数千个连接,每个线程为 1,000 个连接提供服务。线程的数量对问题来说确实不重要,但是在这样的任务量中,10,000 个线程是不切实际的。
为了处理同步,互斥锁本身实际上并没有按照您的建议进行。您描述的概念是一种事件对象,可能是一个自动重置事件,它本身就是一个更高级别的概念。 Windows 将它们作为其 API 的一部分,但它们是在 Linux 上构建的(通常用于便携式软件),具有两个原始组件,一个互斥锁和一个条件变量。这些一起创建了自动重置事件,以及其他类型的 "waitable events" 作为 Windows 调用它们。在 C++ 中,这些由 std::mutex
和 std::condition_variable
.
互斥体本身仅提供对公共资源的锁定控制。在那种情况下,我们不是从客户端和服务器(或工作人员和执行人员)的角度来考虑,而是从对等方之间对单一资源的竞争来考虑,该单一资源只能由一个参与者(线程)在同一时间访问时间。互斥量可以阻止执行,但不会根据外部信号释放。如果另一个线程锁定了互斥量,互斥量就会阻塞,并无限期地等待,直到锁的所有者释放它。这不是您在问题中提出的情况。
在您的场景中,有许多 "clients" 和一个 "server" 线程。服务器负责发出信号,表明某些内容已准备好进行处理。所有其他线程都是此设计中的客户端(线程本身并没有使它们成为客户端,我们只是通过它们执行的功能来认为它们是客户端)。在一些讨论中,客户端被称为工作线程。
客户端使用mutex/condition变量对等待信号。这种构造通常采用锁定互斥锁的形式,然后使用该互斥锁等待条件变量。当线程在条件变量上输入 wait
时,互斥体就会解锁。对于等待工作完成的所有客户端线程重复此操作。典型的客户端等待示例是:
std::mutex m;
std::condition_variable cv;
void client_thread()
{
// Wait until server signals data is ready
std::unique_lock<std::mutex> lk(m); // lock the mutex
cv.wait(lk); // wait on cv
// do the work
}
这是显示一起使用的 mutex/conditional 变量的伪代码。 std::condition_variable
wait 函数有两个重载,这是最简单的一个。目的是线程将阻塞,进入空闲状态,直到 condition_variable 发出信号。它不是一个完整的例子,只是为了指出这两个对象是一起使用的。
Johnathan Wakely 的以下评论是基于 wait
不是不确定的这一事实;不能保证呼叫畅通的原因是因为信号。文档称此为 "spurious wakeup",由于 OS 调度的复杂原因偶尔会发生。 Johnathan 提出的要点是,即使唤醒不是因为 condition_variable 收到信号,使用这一对的代码也必须安全运行。
在使用条件变量的说法中,这称为条件(不是 condition_variable)。条件是一个应用程序定义的概念,在文献中通常被说明为布尔值,并且通常是检查布尔值、整数(有时是原子类型)或调用函数 returning 布尔值的结果。有时应用程序定义的关于什么构成真实条件的概念更复杂,但条件的整体效果是确定线程一旦被唤醒,是否应该继续处理,或者应该简单地重复等待。
满足此要求的一种方法是 std::condition_variable::wait 的第二个版本。两者声明:
void wait( std::unique_lock<std::mutex>& lock );
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
Johnathan 的观点是坚持使用第二个版本。但是,文档描述(并且有两个重载的事实表明)谓词是可选的。 Predicate 是某种函子,通常是 lambda 表达式,如果等待应该解除阻塞则解析为 true,如果等待应该继续等待则解析为 false,并且在锁定下进行评估。 Predicate 是条件的同义词,因为 Predicate 是一种指示 wait 是否应该解除阻塞的真或假的方式。
虽然 Predicate 实际上是可选的,但是 'wait' 在接收到信号之前在阻塞方面并不完美的概念要求如果使用第一个版本,那是因为应用程序是这样构建的虚假唤醒没有任何后果(实际上,是设计的一部分)。
Jonathan 的引文表明 Predicate 是在锁定的情况下求值的,但是以通常不可行的范例的广义形式求值。 std::condition_variable 必须等待锁定的 std::mutex,这可能会保护定义条件的变量,但有时这是不可能的。有时条件更复杂、更外部或更微不足道,以至于 std::mutex 与条件无关。
要了解它在提议的解决方案的上下文中是如何工作的,假设有 10 个客户端线程在等待服务器发出工作即将完成的信号,并且该工作被安排在队列中作为虚拟容器的容器。函子。虚函子可能是这样的:
struct VFunc
{
virtual void operator()(){}
};
template <typename T>
struct VFunctor
{
// Something referring to T, possible std::function
virtual void operator()(){...call the std::function...}
};
typedef std::deque< VFunc > Queue;
上面的伪代码建议使用一个典型的仿函数,它带有一个虚运算符 (),return 为 void 且不采用任何参数,有时称为 "blind call"。建议的关键点是 Queue 可以在不知道调用什么的情况下拥有这些集合,并且 Queue 中的任何 VFunctors 都可以引用任何 std::function 可能能够调用的东西,其中包括其他成员函数对象、lambda、简单函数等。但是,如果只有一个函数签名被调用,也许:
typedef std::deque< std::function<void(void)>> Queue
足够了。
对于任何一种情况,只有当队列中有条目时才会完成工作。
等待,可以使用 class 像:
class AutoResetEvent
{
private:
std::mutex m;
std::condition_variable cv;
bool signalled;
bool signalled_all;
unsigned int wcount;
public:
AutoResetEvent() : wcount( 0 ), signalled(false), signalled_all(false) {}
void SignalAll() { std::unique_lock<std::mutex> l(m);
signalled = true;
signalled_all = true;
cv.notify_all();
}
void SignalOne() { std::unique_lock<std::mutex> l(m);
signalled = true;
cv.notify_one();
}
void Wait() { std::unique_lock<std::mutex> l(m);
++wcount;
while( !signalled )
{
cv.wait(l);
}
--wcount;
if ( signalled_all )
{ if ( wcount == 0 )
{ signalled = false;
signalled_all = false;
}
}
else { signalled = false;
}
}
};
这是可等待对象的标准重置事件类型的伪代码,兼容Windows CreateEvent
和WaitForSingleObject
API,功能基本相同。
所有客户端线程结束于 cv.wait(这可以在 Windows 中超时,使用 Windows API,但不使用 std::condition_variable
).在某些时候,服务器通过调用 Signalxxx 来发出事件信号。您的情况表明 SignalAll()
.
如果调用notify_one,一个等待线程被释放,所有其他线程保持睡眠状态。 notify_all 被调用,然后所有等待该条件的线程被释放去做工作。
以下可能是使用 AutoResetEvent 的示例:
AutoResetEvent evt; // probably not a global
void client()
{
while( !Shutdown ) // assuming some bool to indicate shutdown
{
if ( IsWorkPending() ) DoWork();
evt.Wait();
}
}
void server()
{
// gather data
evt.SignalAll();
}
IsWorkPending()
的使用满足条件的概念,正如 Jonathan Wakely 指出的那样。在指示关闭之前,此循环将处理挂起的工作,否则等待信号。虚假唤醒没有负面影响。 IsWorkPending()
会检查 Queue.size()
,可能是通过一个用 std::mutex 或其他同步机制保护队列的对象。如果工作待处理,DoWork()
将按顺序从队列中弹出条目,直到队列为空。在 return 之后,循环将再次等待信号。
综上所述,mutex 和 condition_variable 的组合与旧的思维方式有关,在 C++11/C++14 时代已经过时了。除非您在使用兼容编译器时遇到问题,否则最好研究 std::promise、std::future 以及 std::async 或 std::thread 与 std::packaged_task 的使用。例如,使用 future、promise、packaged_task 和 thread 可以完全替代上面的讨论。
例如:
// a function for threads to execute
int func()
{
// do some work, return status as result
return result;
}
假设 func 在文件上完成您需要的工作,这些类型定义适用:
typedef std::packaged_task< int() > func_task;
typedef std::future< int > f_int;
typedef std::shared_ptr< f_int > f_int_ptr;
typedef std::vector< f_int_ptr > f_int_vec;
std::future无法复制,所以使用shared_ptr存储,方便在向量中使用,但有多种解决方案。
接下来,一个将这些用于 10 个工作线程的示例
void executive_function()
{
// a vector of future pointers
f_int_vec future_list;
// start some threads
for( int n=0; n < 10; ++n )
{
// a packaged_task calling func
func_task ft( &func );
// get a future from the task as a shared_ptr
f_int_ptr future_ptr( new f_int( ft.get_future() ) );
// store the task for later use
future_list.push_back( future_ptr );
// launch a thread to call task
std::thread( std::move( ft )).detach();
}
// at this point, 10 threads are running
for( auto &d : future_list )
{
// for each future pointer, wait (block if required)
// for each thread's func to return
d->wait();
// get the result of the func return value
int res = d->get();
}
}
这里的重点确实在最后一个range-for循环中。向量存储 packaged_task 提供的期货。这些任务用于启动线程,未来是同步执行的关键。一旦所有线程都为 运行,每个线程都为 "waited on",只需简单调用 future 的 wait 函数,之后即可获得 func 的 return 值。不涉及互斥锁或 condition_variables(据我们所知)。
这让我想到了并行处理文件的主题,无论您如何启动多个线程。如果有一台机器可以处理 10,000 个线程,那么如果每个线程都是一个微不足道的面向文件的操作,就会有相当多的 RAM 资源专门用于文件处理,所有这些都相互重复。根据选择的 API,每个读取操作都有关联的缓冲区。
假设文件为 10 MB,并且有 10,000 个线程开始对其进行操作,其中每个线程使用 4 KB 缓冲区进行处理。结合起来,这表明将有 40 MB 的缓冲区来处理 10 MB 的文件。简单地将文件读入 RAM 并提供对 RAM 中所有线程的只读访问会减少浪费。
由于多个任务在不同时间从文件的不同部分读取可能会导致标准硬盘的严重抖动(闪存源不是这样),如果磁盘缓存不能,则这个概念变得更加复杂赶上。不过,更重要的是,10,000 个线程都在调用系统 API 来读取文件,每个线程都有相当大的开销。
如果源 material 是完全读入 RAM 的候选者,则线程可以专注于 RAM 而不是文件,从而减轻开销,提高性能。线程可以在没有锁的情况下共享对内容的读取访问。
如果源文件太大而无法完全读入 RAM,最好还是分块读取源文件,让线程从共享内存资源中处理该部分,然后移动到下一个块系列。