c中多线程服务器的设计

Design of multi-threaded server in c

尝试在 linux 上实现具有并发支持的简单回显服务器时。

使用了以下方法:

该程序现在似乎可以运行了。

问题是:

这对我来说似乎不必要地复杂。多线程服务器的常用方法是:

  • 在线程进程中创建一个listen-socket
  • 在一个话题中接受 client-connections
  • 对于每个接受的客户端连接,创建一个新线程,它接收相应的文件描述符并完成工作
  • 工作线程在完全处理后关闭客户端连接

我认为在此处预先填充 thread-pool 没有多大好处。

如果你真的想要线程池:

我只想使用链表来接受连接,并使用 pthread_mutex 来同步对它的访问:

  • listener-process 将客户端 fds 排在列表的尾部。
  • 客户端在头部出队。

如果队列为空,线程可以等待变量 (pthread_cond_wait) 并在连接可用时收到侦听器进程 (pthread_cond_signal) 的通知。

另一个选择

根据处理请求的复杂性,可以选择使服务器 single-threaded,即在一个线程中处理所有连接。这完全消除了 context-switches,因此性能非常好。

一个缺点是,只使用了一个CPU-core。为了改善这一点,可以使用 hybrid-model:

  • 每个核心创建一个 worker-thread。
  • 每个线程同时处理 n 个连接。

但是,您必须实施机制以在工作人员之间公平地分配工作。

除了使用pthread_mutex,你会想要使用pthread_cond_t(pthread条件),这将允许你让线程池中的线程在它们实际上没有做的时候进入休眠状态工作。否则,如果他们坐在那里循环检查工作队列中的某些内容,您将浪费计算周期。

我肯定会考虑使用 C++ 而不是纯 C。我建议这样做的原因是在 C++ 中你可以使用模板。使用纯虚拟基础 class(我们称它为:"vtask"),您可以创建接受参数的模板化派生 classes 并在调用重载的 operator() 时插入参数,允许在您的任务中获得更多更多功能:

//============================================================================//

void* thread_pool::execute_thread()
{
    vtask* task = NULL;
    while(true)
    {
        //--------------------------------------------------------------------//
        // Try to pick a task
        m_task_lock.lock();
        //--------------------------------------------------------------------//

        // We need to put condition.wait() in a loop for two reasons:
        // 1. There can be spurious wake-ups (due to signal/ENITR)
        // 2. When mutex is released for waiting, another thread can be waken up
        //    from a signal/broadcast and that thread can mess up the condition.
        //    So when the current thread wakes up the condition may no longer be
        //    actually true!
        while ((m_pool_state != state::STOPPED) && (m_main_tasks.empty()))
        {
            // Wait until there is a task in the queue
            // Unlock mutex while wait, then lock it back when signaled
            m_task_cond.wait(m_task_lock.base_mutex_ptr());
        }

        // If the thread was waked to notify process shutdown, return from here
        if (m_pool_state == state::STOPPED)
        {
            //m_has_exited.
            m_task_lock.unlock();
            //----------------------------------------------------------------//
            if(mad::details::allocator_list_tl::get_allocator_list_if_exists() &&
               tids.find(CORETHREADSELF()) != tids.end())
                mad::details::allocator_list_tl::get_allocator_list()
                        ->Destroy(tids.find(CORETHREADSELF())->second, 1);
            //----------------------------------------------------------------//

            CORETHREADEXIT(NULL);
        }

        task = m_main_tasks.front();
        m_main_tasks.pop_front();
        //--------------------------------------------------------------------//
        //run(task);
        // Unlock
        m_task_lock.unlock();
        //--------------------------------------------------------------------//

        // execute the task
        run(task);

        m_task_count -= 1;
        m_join_lock.lock();
        m_join_cond.signal();
        m_join_lock.unlock();

        //--------------------------------------------------------------------//
    }
    return NULL;
}

//============================================================================//

int thread_pool::add_task(vtask* task)
{
#ifndef ENABLE_THREADING
    run(task);
    return 0;
#endif

    if(!is_alive_flag)
    {
        run(task);
        return 0;
    }

    // do outside of lock because is thread-safe and needs to be updated as
    // soon as possible
    m_task_count += 1;

    m_task_lock.lock();

    // if the thread pool hasn't been initialize, initialize it
    if(m_pool_state == state::NONINIT)
        initialize_threadpool();

    // TODO: put a limit on how many tasks can be added at most
    m_main_tasks.push_back(task);

    // wake up one thread that is waiting for a task to be available
    m_task_cond.signal();

    m_task_lock.unlock();

    return 0;
}

//============================================================================//

void thread_pool::run(vtask*& task)
{
    (*task)();

    if(task->force_delete())
    {
        delete task;
        task = 0;
    } else {
        if(task->get() && !task->is_stored_elsewhere())
            save_task(task);
        else if(!task->is_stored_elsewhere())
        {
            delete task;
            task = 0;
        }
    }
}

在上面,每个创建的线程运行 execute_thread() 直到 m_pool_state 设置为 state::STOPPED。您锁定 m_task_lock,如果状态不是 STOPPED 且列表为空,则将 m_task_lock 传递给您的条件,这会使线程休眠并释放锁。您创建任务(未显示),添加任务(m_task_count 是一个原子,顺便说一下,这就是它是线程安全的原因)。在添加任务期间,发出条件信号以唤醒线程,线程从[=35=之后的execute_thread()的m_task_cond.wait(m_task_lock.base_mutex_ptr())部分继续进行] 已获取并锁定。

注意:这是一个高度自定义的实现,它将大部分 pthread functions/objects 封装到 C++ classes 中,因此 copy-and-pasting 将无法正常工作...抱歉。并且 w.r.t。 thread_pool::run(),除非您担心 return 值,否则 (*task)() 行就是您所需要的。

希望对您有所帮助。

编辑:m_join_* 参考资料用于检查是否所有任务都已完成。主线程处于类似的条件等待状态,检查是否所有任务都已完成,因为这对于我在继续之前使用此实现的应用程序是必要的。