c中多线程服务器的设计
Design of multi-threaded server in c
尝试在 linux 上实现具有并发支持的简单回显服务器时。
使用了以下方法:
- 使用
pthread
函数创建线程池,并维护在链表中。它在进程启动时创建,并在进程终止时销毁。
- 主线程将接受请求,并使用
POSIX message queue
存储接受的套接字文件描述符。
- 池中的线程循环读取消息队列,并处理它收到的请求,当没有请求时,它会阻塞。
该程序现在似乎可以运行了。
问题是:
- 中间用
message queue
合适吗,够不够高效?
- 完成需要处理来自多个客户端的并发请求的线程工具的一般方法是什么?
- 如果让池中的线程循环并阻塞从消息队列中检索消息是不合适的,那么如何向线程传递请求?
这对我来说似乎不必要地复杂。多线程服务器的常用方法是:
- 在线程进程中创建一个listen-socket
- 在一个话题中接受 client-connections
- 对于每个接受的客户端连接,创建一个新线程,它接收相应的文件描述符并完成工作
- 工作线程在完全处理后关闭客户端连接
我认为在此处预先填充 thread-pool 没有多大好处。
如果你真的想要线程池:
我只想使用链表来接受连接,并使用 pthread_mutex
来同步对它的访问:
- listener-process 将客户端 fds 排在列表的尾部。
- 客户端在头部出队。
如果队列为空,线程可以等待变量 (pthread_cond_wait) 并在连接可用时收到侦听器进程 (pthread_cond_signal) 的通知。
另一个选择
根据处理请求的复杂性,可以选择使服务器 single-threaded,即在一个线程中处理所有连接。这完全消除了 context-switches,因此性能非常好。
一个缺点是,只使用了一个CPU-core。为了改善这一点,可以使用 hybrid-model:
- 每个核心创建一个 worker-thread。
- 每个线程同时处理 n 个连接。
但是,您必须实施机制以在工作人员之间公平地分配工作。
除了使用pthread_mutex,你会想要使用pthread_cond_t(pthread条件),这将允许你让线程池中的线程在它们实际上没有做的时候进入休眠状态工作。否则,如果他们坐在那里循环检查工作队列中的某些内容,您将浪费计算周期。
我肯定会考虑使用 C++ 而不是纯 C。我建议这样做的原因是在 C++ 中你可以使用模板。使用纯虚拟基础 class(我们称它为:"vtask"),您可以创建接受参数的模板化派生 classes 并在调用重载的 operator() 时插入参数,允许在您的任务中获得更多更多功能:
//============================================================================//
void* thread_pool::execute_thread()
{
vtask* task = NULL;
while(true)
{
//--------------------------------------------------------------------//
// Try to pick a task
m_task_lock.lock();
//--------------------------------------------------------------------//
// We need to put condition.wait() in a loop for two reasons:
// 1. There can be spurious wake-ups (due to signal/ENITR)
// 2. When mutex is released for waiting, another thread can be waken up
// from a signal/broadcast and that thread can mess up the condition.
// So when the current thread wakes up the condition may no longer be
// actually true!
while ((m_pool_state != state::STOPPED) && (m_main_tasks.empty()))
{
// Wait until there is a task in the queue
// Unlock mutex while wait, then lock it back when signaled
m_task_cond.wait(m_task_lock.base_mutex_ptr());
}
// If the thread was waked to notify process shutdown, return from here
if (m_pool_state == state::STOPPED)
{
//m_has_exited.
m_task_lock.unlock();
//----------------------------------------------------------------//
if(mad::details::allocator_list_tl::get_allocator_list_if_exists() &&
tids.find(CORETHREADSELF()) != tids.end())
mad::details::allocator_list_tl::get_allocator_list()
->Destroy(tids.find(CORETHREADSELF())->second, 1);
//----------------------------------------------------------------//
CORETHREADEXIT(NULL);
}
task = m_main_tasks.front();
m_main_tasks.pop_front();
//--------------------------------------------------------------------//
//run(task);
// Unlock
m_task_lock.unlock();
//--------------------------------------------------------------------//
// execute the task
run(task);
m_task_count -= 1;
m_join_lock.lock();
m_join_cond.signal();
m_join_lock.unlock();
//--------------------------------------------------------------------//
}
return NULL;
}
//============================================================================//
int thread_pool::add_task(vtask* task)
{
#ifndef ENABLE_THREADING
run(task);
return 0;
#endif
if(!is_alive_flag)
{
run(task);
return 0;
}
// do outside of lock because is thread-safe and needs to be updated as
// soon as possible
m_task_count += 1;
m_task_lock.lock();
// if the thread pool hasn't been initialize, initialize it
if(m_pool_state == state::NONINIT)
initialize_threadpool();
// TODO: put a limit on how many tasks can be added at most
m_main_tasks.push_back(task);
// wake up one thread that is waiting for a task to be available
m_task_cond.signal();
m_task_lock.unlock();
return 0;
}
//============================================================================//
void thread_pool::run(vtask*& task)
{
(*task)();
if(task->force_delete())
{
delete task;
task = 0;
} else {
if(task->get() && !task->is_stored_elsewhere())
save_task(task);
else if(!task->is_stored_elsewhere())
{
delete task;
task = 0;
}
}
}
在上面,每个创建的线程运行 execute_thread() 直到 m_pool_state 设置为 state::STOPPED。您锁定 m_task_lock,如果状态不是 STOPPED 且列表为空,则将 m_task_lock 传递给您的条件,这会使线程休眠并释放锁。您创建任务(未显示),添加任务(m_task_count 是一个原子,顺便说一下,这就是它是线程安全的原因)。在添加任务期间,发出条件信号以唤醒线程,线程从[=35=之后的execute_thread()的m_task_cond.wait(m_task_lock.base_mutex_ptr())部分继续进行] 已获取并锁定。
注意:这是一个高度自定义的实现,它将大部分 pthread functions/objects 封装到 C++ classes 中,因此 copy-and-pasting 将无法正常工作...抱歉。并且 w.r.t。 thread_pool::run(),除非您担心 return 值,否则 (*task)() 行就是您所需要的。
希望对您有所帮助。
编辑:m_join_* 参考资料用于检查是否所有任务都已完成。主线程处于类似的条件等待状态,检查是否所有任务都已完成,因为这对于我在继续之前使用此实现的应用程序是必要的。
尝试在 linux 上实现具有并发支持的简单回显服务器时。
使用了以下方法:
- 使用
pthread
函数创建线程池,并维护在链表中。它在进程启动时创建,并在进程终止时销毁。 - 主线程将接受请求,并使用
POSIX message queue
存储接受的套接字文件描述符。 - 池中的线程循环读取消息队列,并处理它收到的请求,当没有请求时,它会阻塞。
该程序现在似乎可以运行了。
问题是:
- 中间用
message queue
合适吗,够不够高效? - 完成需要处理来自多个客户端的并发请求的线程工具的一般方法是什么?
- 如果让池中的线程循环并阻塞从消息队列中检索消息是不合适的,那么如何向线程传递请求?
这对我来说似乎不必要地复杂。多线程服务器的常用方法是:
- 在线程进程中创建一个listen-socket
- 在一个话题中接受 client-connections
- 对于每个接受的客户端连接,创建一个新线程,它接收相应的文件描述符并完成工作
- 工作线程在完全处理后关闭客户端连接
我认为在此处预先填充 thread-pool 没有多大好处。
如果你真的想要线程池:
我只想使用链表来接受连接,并使用 pthread_mutex
来同步对它的访问:
- listener-process 将客户端 fds 排在列表的尾部。
- 客户端在头部出队。
如果队列为空,线程可以等待变量 (pthread_cond_wait) 并在连接可用时收到侦听器进程 (pthread_cond_signal) 的通知。
另一个选择
根据处理请求的复杂性,可以选择使服务器 single-threaded,即在一个线程中处理所有连接。这完全消除了 context-switches,因此性能非常好。
一个缺点是,只使用了一个CPU-core。为了改善这一点,可以使用 hybrid-model:
- 每个核心创建一个 worker-thread。
- 每个线程同时处理 n 个连接。
但是,您必须实施机制以在工作人员之间公平地分配工作。
除了使用pthread_mutex,你会想要使用pthread_cond_t(pthread条件),这将允许你让线程池中的线程在它们实际上没有做的时候进入休眠状态工作。否则,如果他们坐在那里循环检查工作队列中的某些内容,您将浪费计算周期。
我肯定会考虑使用 C++ 而不是纯 C。我建议这样做的原因是在 C++ 中你可以使用模板。使用纯虚拟基础 class(我们称它为:"vtask"),您可以创建接受参数的模板化派生 classes 并在调用重载的 operator() 时插入参数,允许在您的任务中获得更多更多功能:
//============================================================================//
void* thread_pool::execute_thread()
{
vtask* task = NULL;
while(true)
{
//--------------------------------------------------------------------//
// Try to pick a task
m_task_lock.lock();
//--------------------------------------------------------------------//
// We need to put condition.wait() in a loop for two reasons:
// 1. There can be spurious wake-ups (due to signal/ENITR)
// 2. When mutex is released for waiting, another thread can be waken up
// from a signal/broadcast and that thread can mess up the condition.
// So when the current thread wakes up the condition may no longer be
// actually true!
while ((m_pool_state != state::STOPPED) && (m_main_tasks.empty()))
{
// Wait until there is a task in the queue
// Unlock mutex while wait, then lock it back when signaled
m_task_cond.wait(m_task_lock.base_mutex_ptr());
}
// If the thread was waked to notify process shutdown, return from here
if (m_pool_state == state::STOPPED)
{
//m_has_exited.
m_task_lock.unlock();
//----------------------------------------------------------------//
if(mad::details::allocator_list_tl::get_allocator_list_if_exists() &&
tids.find(CORETHREADSELF()) != tids.end())
mad::details::allocator_list_tl::get_allocator_list()
->Destroy(tids.find(CORETHREADSELF())->second, 1);
//----------------------------------------------------------------//
CORETHREADEXIT(NULL);
}
task = m_main_tasks.front();
m_main_tasks.pop_front();
//--------------------------------------------------------------------//
//run(task);
// Unlock
m_task_lock.unlock();
//--------------------------------------------------------------------//
// execute the task
run(task);
m_task_count -= 1;
m_join_lock.lock();
m_join_cond.signal();
m_join_lock.unlock();
//--------------------------------------------------------------------//
}
return NULL;
}
//============================================================================//
int thread_pool::add_task(vtask* task)
{
#ifndef ENABLE_THREADING
run(task);
return 0;
#endif
if(!is_alive_flag)
{
run(task);
return 0;
}
// do outside of lock because is thread-safe and needs to be updated as
// soon as possible
m_task_count += 1;
m_task_lock.lock();
// if the thread pool hasn't been initialize, initialize it
if(m_pool_state == state::NONINIT)
initialize_threadpool();
// TODO: put a limit on how many tasks can be added at most
m_main_tasks.push_back(task);
// wake up one thread that is waiting for a task to be available
m_task_cond.signal();
m_task_lock.unlock();
return 0;
}
//============================================================================//
void thread_pool::run(vtask*& task)
{
(*task)();
if(task->force_delete())
{
delete task;
task = 0;
} else {
if(task->get() && !task->is_stored_elsewhere())
save_task(task);
else if(!task->is_stored_elsewhere())
{
delete task;
task = 0;
}
}
}
在上面,每个创建的线程运行 execute_thread() 直到 m_pool_state 设置为 state::STOPPED。您锁定 m_task_lock,如果状态不是 STOPPED 且列表为空,则将 m_task_lock 传递给您的条件,这会使线程休眠并释放锁。您创建任务(未显示),添加任务(m_task_count 是一个原子,顺便说一下,这就是它是线程安全的原因)。在添加任务期间,发出条件信号以唤醒线程,线程从[=35=之后的execute_thread()的m_task_cond.wait(m_task_lock.base_mutex_ptr())部分继续进行] 已获取并锁定。
注意:这是一个高度自定义的实现,它将大部分 pthread functions/objects 封装到 C++ classes 中,因此 copy-and-pasting 将无法正常工作...抱歉。并且 w.r.t。 thread_pool::run(),除非您担心 return 值,否则 (*task)() 行就是您所需要的。
希望对您有所帮助。
编辑:m_join_* 参考资料用于检查是否所有任务都已完成。主线程处于类似的条件等待状态,检查是否所有任务都已完成,因为这对于我在继续之前使用此实现的应用程序是必要的。