自定义线程池令人失望的性能

Disappointing performance of custom thread pool

我受困于某个旧的代码库,它只能用 Borland 编译器进行编译。该代码执行一些可以很好地并行 运行 的计算,但由于编译器不支持 OpenMP,我试图提出我自己的围绕 WinAPI WAIT_CONDITIONs 和 SRWLOCK 构建的线程 bool。池本身似乎工作正常,只是性能并不比单线程代码好多少。我发现很难相信锁定逻辑会如此繁重以至于完全抹杀并行处理的好处。快速查看 Process Explorer 告诉我,我的 8 个(四核 HT CPU)工作人员中的每一个都使用了 CPU 的大约 0.5% 的时间,这让我认为工作人员大部分时间都在睡觉。

我在这里错过了什么?是的,我确信我尝试并行 运行 的位是最热的路径。

一些相关的代码位:

class Barrier {
public:
    Barrier(const int workers) :
            working(0),
            workers(workers)
    {
            ::InitializeSRWLock(&lock);
            ::InitializeConditionVariable(&waitForWork);
            ::InitializeConditionVariable(&workDone);
    }

    ~Barrier()
    {
    }

    void Rendezvous()
    {
            ::ReleaseSRWLockExclusive(&lock);

            ::WakeAllConditionVariable(&waitForWork);

            ::AcquireSRWLockExclusive(&lock);
            while (working > 0)
                    ::SleepConditionVariableSRW(&workDone, &lock, INFINITE, 0);
            ::ReleaseSRWLockExclusive(&lock);
    }

    volatile long working;

    SRWLOCK lock;
    CONDITION_VARIABLE waitForWork;
    CONDITION_VARIABLE workDone;

private:
    const long workers;
};

class Worker {
public:
    Worker(Barrier *_bar) :
            /* Some worker data */
            terminate(false),
            failed(false),
            hThread(NULL),
            threadId(0),
            bar(_bar)
    {
    }
    
    /* Some worker data */

    bool terminate;
    bool failed;

    HANDLE hThread;
    DWORD threadId;

    Barrier *bar;

private:
    Worker(const Worker &other)
    {
    }
};

bool WorkingBlock::Process(/* Some worker data */)
{
    ::AcquireSRWLockExclusive(&m_barrier->lock);
    for (int thr = 0; thr < int(m_NThreads); thr++) {
            Worker *wrk = m_workers->operator[](thr);
            /* Setup workers */
            PrepareWorker(wrk); /* This increments the "working" variable in barrier */
            wrk->processing = true;
    }

    /* Wait till workers finish */
    m_barrier->Rendezvous();
    
    /* Process results */
}

inline
DWORD WINAPI WorkerProc(LPVOID param)
{
    Worker *wrk = static_cast<Worker *>(param);
    
    while (true) {
        ::AcquireSRWLockShared(&wrk->bar->lock);
        while (!wrk->processing && !wrk->terminate)
            ::SleepConditionVariableSRW(&wrk->bar->waitForWork, &wrk->bar->lock,
                                        INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED);
        ::ReleaseSRWLockShared(&wrk->bar->lock);

        if (wrk->terminate) {
            return 0;
        }
    
        /* Do the calculation */

        wrk->processing = false;

        ::AcquireSRWLockExclusive(&wrk->bar->lock);
        wrk->bar->working--;
        if (wrk->bar->working == 0) {
            ::ReleaseSRWLockExclusive(&wrk->bar->lock);
            ::WakeConditionVariable(&wrk->bar->workDone);
        } else
            ::ReleaseSRWLockExclusive(&wrk->bar->lock);
    }

    return 0;
}

我预先启动工作线程并让它们休眠并等待新一批工作准备就绪。我是否缺少某种同步冲突?

编辑:在代码中添加了 processing Worker 标志的用法。

经过更多调查后发现分析器结果需要一些额外的解释。此外,整个主循环中还有许多令人惊讶的低效代码。在处理了最坏的情况并在更多地方并行化循环后,我的性能得到了不错的提升。如果问题足够大,我可以在 4C/8T CPU 上获得高达 ~60% 的平均 CPU 利用率。它远不及 OpenMP,但总比没有好。

为了将来参考,这是我最终得到的用于并行化 for 循环的类似于 OpenMP 的线程池。

SThreadPool.h

#ifndef STHREADPOOL_H
#define STHREADPOOL_H

#include <stdexcept>
#include <vector>

#if (__cplusplus >= 201103L) || (_MSC_VER >= 1900)
        #define STP_HAVE_CPP11
#endif // CPP11 check

#ifdef STP_HAVE_CPP11
        #define STP_NOTHROW noexcept
#else
        #define STP_NOTHROW throw()
#endif // STP_HAVE_CPP11

namespace stpool {

class Exception : public std::runtime_error {
public:
        explicit Exception(const char *msg);
};

enum WorkerResult {
        WR_SUCCESS,     /*!< Worker finished correctly, results are OK */
        WR_FAILURE,     /*!< Worker finished abnormally, results should be discarded */
        WR_SKIPPED,     /*!< Worker did not execute because the job size was too small */
        WR_INVALID      /*!< This state must never be returned */
};

class Barrier;
class WorkerPrivate;

class Worker {
public:
        Worker();

        WorkerResult result;    /*<! Result of last worker cycle */
        WorkerPrivate *priv;    /*<! Internal worker data, do not touch!*/

private:
        Worker(const Worker &other);
        Worker & operator=(const Worker &other);
};

typedef std::vector<Worker *> WorkerVec;
typedef const std::vector<Worker *> & CWorkerVecRef;
typedef void * Payload;
typedef std::vector<Payload> PayloadVec;

/*!
 * WorkerFunc prototype.
 *
 * @param[in] from First index of the portion of the loop
 * @param[in] to Last index of the portion of the loop
 * @param[in,out] p Data specific for the given job
 *
 * @retval true Job succeeded
 * @retval false Job failed
 */
typedef bool (*WorkerFunc)(const int from, const int to, Payload p);

/*!
 * Specifies the condition for the last element in the loop
 */
enum TerminationPolicy {
        TPOL_INCLUSIVE, /*!< "=<" terminating condition */
        TPOL_EXCLUSIVE  /*!< "<" terminating condition */
};

/*!
 * ThreadPool object capable of executing for-loops in parallel.
 * Think of this as poor man's OpenMP...
 */
class ThreadPool {
public:
        /*!
         * ThreadPool c-tor
         *
         * @param[in] NThreads Number of worker threads to prepare
         */
        ThreadPool(const long NThreads);
        ~ThreadPool();

        /*!
         * Runs the parallel job. Payloads must be set up before this function
         * is called.
         *
         * @param[in] from First index in the loop
         * @param[in] to Last index in the loop
         * @param[in,out] payloads Vector of data specific for the given loop.
         *                          Size of the vector must be the same as the number
         *                          of worker threads.
         * @param[in] func Function that performs the actual calculation
         *
         * @return Vector of finished workers
         */
        template <TerminationPolicy Policy>
        CWorkerVecRef Process(const int from, const int to,
                              const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW;

        /*!
         * Returns number of available worker threads
         *
         * @return Number of available worker threads
         */
        long Threads() const;

private:
        ThreadPool(const ThreadPool &other);
        ThreadPool & operator=(const ThreadPool &other);

        void Cleanup();
        void PrepareWorker(Worker *wrk, const int from, const int to,
                           Payload payload, WorkerFunc func);
        void SkipWorker(Worker *wrk);

        WorkerVec m_workers;
        Barrier *m_barrier;

        const long m_NThreads;
};

/*!
 * Number of available logical CPUs
 */
int NumOfCPUs();

} // namespace stpool

#endif // STHREADPOOL_H

SThreadPool.cpp

#include "SThreadPool.h"

#include <Windows.h>

#include <cassert>

#define STP_USE_SRWLOCK 1
#define STP_USE_SYSTEM_SRWLOCK 0
#define STP_SPIN_FOR_RDVZ 0

/* Spinning for rendezvous seems to be more efficient only
 * when there is less workers that available CPUs */

#if STP_USE_SRWLOCK
        #if !STP_USE_SYSTEM_SRWLOCK
                #define SRWLOCK_INIT RTL_SRWLOCK_INIT
                typedef RTL_SRWLOCK SRWLOCK, *PSRWLOCK;
        #endif // USE_SYSTEM_SRWLOCK

        #define LOCK_PRIM SRWLOCK
        #define InitLock(lk) ::InitializeSRWLock(lk)
        #define DeleteLock(lk)
        #define AcquireLkExcl(lk) ::AcquireSRWLockExclusive(lk)
        #define AcquireLkShared(lk) ::AcquireSRWLockShared(lk)
        #define ReleaseLkExcl(lk) ::ReleaseSRWLockExclusive(lk)
        #define ReleaseLkShared(lk) ::ReleaseSRWLockShared(lk)
        #define WaitCondExcl(wc, lk) ::SleepConditionVariableSRW(wc, lk, INFINITE, 0)
        #define WaitCondShared(wc, lk) ::SleepConditionVariableSRW(wc, lk, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED)
#else
        #define LOCK_PRIM CRITICAL_SECTION
        #define InitLock(lk) ::InitializeCriticalSectionAndSpinCount(lk, 5000)
        #define DeleteLock(lk) ::DeleteCriticalSection(lk)
        #define AcquireLkExcl(lk) ::EnterCriticalSection(lk)
        #define AcquireLkShared(lk) ::EnterCriticalSection(lk)
        #define ReleaseLkExcl(lk) ::LeaveCriticalSection(lk)
        #define ReleaseLkShared(lk) ::LeaveCriticalSection(lk)
        #define WaitCondExcl(wc, lk) ::SleepConditionVariableCS(wc, lk, INFINITE)
        #define WaitCondShared(wc, lk) ::SleepConditionVariableCS(wc, lk, INFINITE)
#endif // STP_USE_SRWLOCK

namespace stpool {

Exception::Exception(const char *msg) :
        std::runtime_error(msg)
{
}

class Barrier {
public:
        Barrier(const long workers) :
                working(0),
                workers(workers)
        {
                InitLock(&lock);
                ::InitializeConditionVariable(&waitForWork);
        #if !STP_SPIN_FOR_RDVZ
                ::InitializeConditionVariable(&workDone);
        #endif // STP_SPIN_FOR_RDVZ
        }

        ~Barrier()
        {
                DeleteLock(&lock);
        }

        void Rendezvous();

        volatile long working;

        LOCK_PRIM lock;
        CONDITION_VARIABLE waitForWork;
#if !STP_SPIN_FOR_RDVZ
        CONDITION_VARIABLE workDone;
#endif // STP_SPIN_FOR_RDVZ

private:
        const long workers;
};

void Barrier::Rendezvous()
{
        ReleaseLkExcl(&lock);

        ::WakeAllConditionVariable(&waitForWork);

#if !STP_SPIN_FOR_RDVZ
        AcquireLkExcl(&lock);
        while (working > 0)
                WaitCondExcl(&workDone, &lock);
        ReleaseLkExcl(&lock);
#else
        while (working > 0)
                YieldProcessor();
#endif // STP_SPIN_FOR_RDVZ
}

Worker::Worker() :
        result(WR_FAILURE)
{
}

class WorkerPrivate {
public:
        WorkerPrivate() :
                from(-1),
                to(-1),
                payload(NULL),
                func(NULL),
                process(false),
                terminate(false),
                failed(false),
                threadId(0),
                hThread(NULL),
                barrier(NULL)
        {
        }

        int from;
        int to;
        Payload payload;
        WorkerFunc func;

        bool process;
        bool terminate;
        bool failed;

        DWORD threadId;
        HANDLE hThread;
        Barrier *barrier;
};

static
DWORD WINAPI ThreadProc(LPVOID param)
{
        Worker *wrk = static_cast<Worker *>(param);
        WorkerPrivate *priv = wrk->priv;

        while (true) {
        #ifdef STP_PRN_TPTS
                {
                AnsiString str("Worker waiting: ");
                str += wrk->threadId;
                OutputDebugStringA(str.c_str());
                }
        #endif // STP_PRN_TPTS

                AcquireLkShared(&priv->barrier->lock);
                while (!priv->process && !priv->terminate)
                        WaitCondShared(&priv->barrier->waitForWork, &priv->barrier->lock);
                ReleaseLkShared(&priv->barrier->lock);

                if (priv->terminate)
                        return 0;

                assert(priv->payload != NULL);
                assert(priv->func != NULL);

                const bool ret = priv->func(priv->from, priv->to, priv->payload);
                wrk->result = ret ? WR_SUCCESS : WR_FAILURE;

                priv->process = false;

        #ifdef STP_PRN_TPTS
                {
                AnsiString str("Worker done: ");
                str += wrk->threadId;
                OutputDebugStringA(str.c_str());
                }
        #endif // STP_PRN_TPTS

        #if !STP_SPIN_FOR_RDVZ
                AcquireLkExcl(&priv->barrier->lock);
                priv->barrier->working--;
                if (priv->barrier->working == 0) {
                        ReleaseLkExcl(&priv->barrier->lock);
                        ::WakeConditionVariable(&priv->barrier->workDone);
                } else
                        ReleaseLkExcl(&priv->barrier->lock);
        #else
                ::InterlockedDecrement(&priv->barrier->working);
        #endif // SPIN_FOR_RDVZ
        }

        return 0;
}

ThreadPool::ThreadPool(const long NThreads) :
        m_NThreads(NThreads)
{
        if (m_NThreads < 1)
                throw Exception("Invalid argument");

        m_barrier = new Barrier(m_NThreads);

        m_workers.reserve(NThreads);
        for (int thr = 0; thr < m_NThreads ; thr++) {
                Worker *wrk = new Worker();
                WorkerPrivate *priv = new WorkerPrivate;

                DWORD thrId;
                HANDLE hThread = ::CreateThread(NULL, 0, ThreadProc, wrk, 0, &thrId);
                if (hThread == NULL) {
                        delete priv;
                        delete wrk;
                        Cleanup();

                        throw Exception("Failed to initialize thread pool");
                }

                priv->threadId = thrId;
                priv->hThread = hThread;
                priv->barrier = m_barrier;
                wrk->priv = priv;

                m_workers.push_back(wrk);
        }
}

ThreadPool::~ThreadPool()
{
        Cleanup();

        delete m_barrier;
}

void ThreadPool::Cleanup()
{
        AcquireLkExcl(&m_barrier->lock);
        for (size_t idx = 0; idx < m_workers.size(); idx++)
                m_workers[idx]->priv->terminate = true;
        ReleaseLkExcl(&m_barrier->lock);

        ::WakeAllConditionVariable(&m_barrier->waitForWork);

        for (size_t idx = 0; idx < m_workers.size(); idx++) {
                Worker *wrk = m_workers[idx];

                if (!wrk->priv->failed) {
                        ::WaitForSingleObject(wrk->priv->hThread, INFINITE);
                        ::CloseHandle(wrk->priv->hThread);
                }

                delete wrk->priv;
                delete wrk;
        }
}

template <>
CWorkerVecRef ThreadPool::Process<TPOL_EXCLUSIVE>(const int from, const int to,
                                                  const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW
{
        assert(to > from);
        assert(m_workers.size() == payloads.size());

        const int slice = int(float(to - from) / m_NThreads + 0.5F);

#ifdef STP_PRN_TPTS
        OutputDebugStringA("--- Para start");
#endif // STP_PRN_TPTS

        int idx = from;

        AcquireLkExcl(&m_barrier->lock);
        for (long thr = 0; thr < m_NThreads - 1; thr++) {
                Worker *wrk = m_workers[thr];
                Payload pl = payloads[thr];

                if (idx > to) {
                        SkipWorker(wrk);
                        break;
                }

                int realTo = idx + slice;
                if (realTo > to)
                        realTo = to;

                PrepareWorker(wrk, idx, realTo, pl, func);
                idx += slice;
        }

        Worker *wrk = m_workers.back();
        Payload pl = payloads.back();
        if (idx <= to)
                PrepareWorker(wrk, idx, to, pl, func);
        else
                SkipWorker(wrk);

        m_barrier->Rendezvous();

        return m_workers;
}

template <>
CWorkerVecRef ThreadPool::Process<TPOL_INCLUSIVE>(const int from, const int to,
                                                  const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW
{
        assert(to >= from);
        assert(m_workers.size() == payloads.size());

        const int slice = int(float(to - from) / m_NThreads + 0.5F);

#ifdef STP_PRN_TPTS
        OutputDebugStringA("--- Para start");
#endif // STP_PRN_TPTS

        int idx = from;

        AcquireLkExcl(&m_barrier->lock);
        for (long thr = 0; thr < m_NThreads - 1; thr++) {
                Worker *wrk = m_workers[thr];
                Payload pl = payloads[thr];

                if (idx > to) {
                        SkipWorker(wrk);
                        break;
                }

                int realTo = idx + slice;
                if (realTo > to)
                        realTo = to;

                PrepareWorker(wrk, idx, realTo, pl, func);
                idx += slice + 1;
        }

        Worker *wrk = m_workers.back();
        Payload pl = payloads.back();
        if (idx <= to)
                PrepareWorker(wrk, idx, to, pl, func);
        else
                SkipWorker(wrk);

        m_barrier->Rendezvous();

        return m_workers;
}

void ThreadPool::PrepareWorker(Worker *wrk, const int from, const int to,
                               Payload payload, WorkerFunc func)
{
        WorkerPrivate *priv = wrk->priv;

        wrk->result = WR_INVALID;

        priv->from = from;
        priv->to = to;
        priv->payload = payload;
        priv->func = func;
        priv->process = true;

        m_barrier->working++;
}

void ThreadPool::SkipWorker(Worker *wrk)
{
        wrk->result = WR_SKIPPED;
}

long ThreadPool::Threads() const
{
        return m_NThreads;
}

int NumOfCPUs()
{
        SYSTEM_INFO info;

        GetSystemInfo(&info);
        return info.dwNumberOfProcessors;
}

} // namespace stpool