通用线程池 class 工作不正常

Generic thread pool class is not working properly

我正在尝试创建一个线程池 class,它接收多个函数并将它们放入队列直到它们完成,然后我可以添加另一个函数来利用创建的线程而不是在我要运行其他功能。这就是为什么我包含一个条件变量来同步所有线程。

但是,代码无法正常工作,因为调用该函数时对象会以某种方式进行复制。几次尝试后,我不知道我错过了什么!

我期望 hw 对象的成员函数 greetings 与他的索引并行执行。但是当 (o.*f)(std::forward<Args>(args)...); 行被执行时,对象被复制,尽管复制构造函数被删除。因此,当它进入 greetings 成员时,它会产生一个 SEGMENTATION FAULT.

CMakeLists.txt

cmake_minimum_required(VERSION 3.5)

project(boost_asyo LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

add_executable(boost_asyo main.cpp)
target_link_libraries(${PROJECT_NAME} boost_thread boost_system)

main.cpp

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>

#include <mutex>
#include <condition_variable>

class Semaphore
{
    std::mutex lock;

    std::condition_variable cond;

    int count;

public:

    Semaphore()
    {
        count = 0;
    }

    void wait()
    {
        std::unique_lock<std::mutex> m(lock);

        while(count > 0)
            cond.wait(m, [this]{ return count == 0; });
    }

    void take()
    {
        std::unique_lock m(lock);

        count++;
    }

    void give()
    {
        std::unique_lock m(lock);

        count--;

        if(count == 0)
        {
            cond.notify_one();
        }
    }
};


class ThreadPool
{
private:
    boost::asio::io_service m_io_service;
    std::unique_ptr<boost::asio::io_service::work> m_work;
    boost::thread_group m_threads;
    Semaphore m_sem;

public:
    ThreadPool(size_t n)
    {
        this->m_work = std::make_unique<boost::asio::io_service::work>(m_io_service);

        for (size_t ii = 0; ii < n; ii++)
        {
            m_threads.create_thread(boost::bind(&boost::asio::io_service::run, &this->m_io_service));
        }
    }

    ThreadPool(const ThreadPool & v) = delete;
    ThreadPool(ThreadPool && v) = delete;

    ~ThreadPool()
    {
        m_io_service.stop();
    }

    template<class type, class T, class T1, class... Args>
    auto post(type T::*f, T1 &obj, Args... args)
    {
        this->m_sem.take();
        this->m_io_service.post([&] ()
        {
            T o = static_cast<T&&>(obj);

            (o.*f)(std::forward<Args>(args)...);
            this->m_sem.give();
        });

    }

    void wait()
    {
        this->m_sem.wait();
    }
};

class HelloWorld
{
private:

public:
    std::string m_str;
    HelloWorld(std::string str) : m_str(str) {};
    HelloWorld(const HelloWorld& v) = delete;
    HelloWorld(HelloWorld&& v) = default;

    ~HelloWorld() = default;

    void greetings(int ii)
    {
        for (int jj = 0; jj < 5; jj++)
        {
            std::cout << this->m_str << " " << ii <<  std::endl;

            boost::this_thread::sleep_for(boost::chrono::seconds(1));
        }

    }
};


int main()
{
    ThreadPool tp(8);

    HelloWorld hw("Hola mundo");

    for (int ii = 0; ii < 5; ii++)
    {
        tp.post(&HelloWorld::greetings, hw, ii);
    }

    tp.wait();

    return 0;
}

此代码基于此代码,它可以正常工作,这与我想对 classes 和成员所做的类似。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>

#include <mutex>
#include <condition_variable>

class Semaphore
{
    std::mutex lock;

    std::condition_variable cond;

    int count;

public:

    Semaphore()
    {
        count = 0;
    }

    void wait()
    {
        std::unique_lock<std::mutex> m(lock);

        while(count > 0)
            cond.wait(m, [this]{ return count == 0; });
    }

    void take()
    {
        std::unique_lock m(lock);

        count++;
    }

    void give()
    {
        std::unique_lock m(lock);

        count--;

        if(count == 0)
        {
            cond.notify_one();
        }
    }
};


int main()
{    
    boost::asio::io_service io_service;
    std::unique_ptr<boost::asio::io_service::work> work = std::make_unique<boost::asio::io_service::work>(io_service);

    boost::thread_group threads;

    for (size_t ii = 0; ii < 2; ii++)
    {
        std::cout << "id: " << ii << std::endl;
        threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
    }

    Semaphore sem;

    for (size_t ii = 0; ii < 3; ii++)
    {
        //Take
        sem.take();

        io_service.post([ii, &sem] ()
        {
            int id = 0;
            while(id < 5)
            {
                id++;
                printf("hello world %i\n", static_cast<int>(ii));
                boost::this_thread::sleep_for(boost::chrono::seconds(1));
            }

            //Give
            sem.give();
        });
    }


    sem.wait();


    for (size_t ii = 0; ii < 3; ii++)
    {
        sem.take();

        io_service.post([ii, &sem] ()
        {
            int id = 0;
            while(id < 5)
            {
                id++;
                printf("bye world %i\n", static_cast<int>(ii));
                boost::this_thread::sleep_for(boost::chrono::seconds(1));
            }
            sem.give();
        });
    }

    sem.wait();

    io_service.stop();

    return 0;
}


我很好奇信号量是什么意思。

io_service 已经是一个任务队列。它是线程安全的,您不需要信号量。

For comparison, here's io_service based thread pool:

  • A thread pool like yours, without Asio, using a condition variable in much the same way but without calling it "semaphore" boost thread throwing exception "thread_resource_error: resource temporarily unavailable"

  • The same thing, but rewritten around io_service like your ThreadPool, which shows how you don't need the semaphore anymore [Solution 1] in c++ work queues with blocking (Solution 2 is using the same threadpool as before).

(Even better, recent Asio versions have a built-in thread-pool).

哪里出错了

这是不安全的:

template <class type, class T, class T1, class... Args>
auto post(type T::*f, T1& obj, Args... args) {
    this->m_sem.take();
    this->m_io_service.post([&]() {
        T o = static_cast<T&&>(obj);
        (o.*f)(std::forward<Args>(args)...);
        this->m_sem.give();
    });
}

具体来说:

  1. T o = static_cast<T&&>(obj);
    

    不复制 T(即 HelloWorld)。你知道那是因为那是不可能的。更糟糕的是:对象从 obj.

    移动

    Incidentally, this assumes that T is move-constructible from T1.

    您通过将右手边显式转换为右值引用来明确要求它。

    This is what std::move is specified to do, actually: "In particular, std::move produces an xvalue expression that identifies its argument t. It is exactly equivalent to a static_cast to an rvalue reference type."

    效果是 main 中的 HelloWorld 实例不再有效,但您继续从它移动以执行后续任务。

  2. 通过引用捕获的其他参数。这意味着它们在任务实际执行之前超出范围(包括 f)。

为了确保安全,您必须在本地副本中捕获参数:

template <class type, class T, class... Args>
auto post(type T::*f, T&& obj, Args... args) {
    this->m_sem.take();
    this->m_io_service.post([=, o = std::move(obj)]() mutable {
        try {
            (o.*f)(args...);
        } catch (...) {
            this->m_sem.give();
            throw;
        }
        this->m_sem.give();
    });
}

备注:

  1. 现在 objrvalue 引用。这意味着 post 不会编译,除非 obj 是一个右值。

    Note this is not a because T is deduced as part of f.

  2. lambda 现在是可变的(因为否则只有 const 成员函数可以 运行 在捕获的 o 上)

  3. 复制所有其他参数 - 这大致是 std::bind 的操作方式,但您可以针对可移动参数进行优化。

  4. 我们处理异常 - 在您的代码中,如果 f 抛出,您永远不会 give() 信号量

当然,main 需要适应,因此多个 HelloWorld 实例实际上是由右值创建和传递的:

for (int ii = 0; ii < 5; ii++) {
    HelloWorld hw("Hola mundo");
    tp.post(&HelloWorld::greetings, std::move(hw), ii);
}

但是 - 它不起作用

至少,对我来说它不能编译。 Asio 要求处理程序可复制 (, How to trick boost::asio to allow move-only handlers)。

此外,我们几乎没有触及表面。通过对 type T::*f 进行硬编码,您需要新的 post 重载很多东西:静态方法、const 成员函数 ...

相反,为什么不用 C++ 方式:

template <class F, class... Args>
auto post(F&& f, Args&&... args) {
    this->m_sem.take();
    this->m_io_service.post(
        [this, f=std::bind(std::forward<F>(f), std::forward<Args>(args)...)] 
        {
            try { f(); }
            catch (...) {
                this->m_sem.give();
                throw;
            }
            this->m_sem.give();
        });
}

实际上,在更现代的 C++ 中,您会编写(假设此处为 c++17):

    //...
        [this, f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)] 
        {
            try { std::apply(f, args); }
    //...

Oh, and we still need

#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS 1

because of the move-only handler type

完整修复版演示

NOTE: Also added an output mutex (s_outputmx) to avoid intermixed console output.

Live On Coliru

#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS 1
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>

#include <mutex>
#include <condition_variable>

class Semaphore {
    std::mutex lock;
    std::condition_variable cond;
    int count;
  public:
    Semaphore() { count = 0; }
    void wait() {
        std::unique_lock<std::mutex> m(lock);
        while (count > 0)
            cond.wait(m, [this] { return count == 0; });
    }
    void take() {
        std::unique_lock m(lock);
        count++;
    }
    void give() {
        std::unique_lock m(lock);
        count--;
        if (count == 0) {
            cond.notify_one();
        }
    }
};


class ThreadPool {
  private:
    boost::asio::io_service m_io_service;
    std::unique_ptr<boost::asio::io_service::work> m_work;
    boost::thread_group m_threads;
    Semaphore m_sem;

  public:
    ThreadPool(size_t n) {
        this->m_work =
            std::make_unique<boost::asio::io_service::work>(m_io_service);
        for (size_t ii = 0; ii < n; ii++) {
            m_threads.create_thread(boost::bind(&boost::asio::io_service::run,
                                                &this->m_io_service));
        }
    }
    ThreadPool(const ThreadPool& v) = delete;
    ThreadPool(ThreadPool&& v) = delete;
    ~ThreadPool() { m_io_service.stop(); }

    template <class F, class... Args>
    auto post(F&& f, Args&&... args) {
        this->m_sem.take();
        this->m_io_service.post(
#if 1 // pre-c++17
            [this, f=std::bind(std::forward<F>(f), std::forward<Args>(args)...)] 
            {
                try { f(); }
#else // https://en.cppreference.com/w/cpp/utility/apply
            [this, f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)] 
            {
                try { std::apply(f, args); }
#endif
                catch (...) {
                    this->m_sem.give();
                    throw;
                }
                this->m_sem.give();
            });
    }


    void wait() { this->m_sem.wait(); }
};

struct HelloWorld {
    std::string m_str;

    HelloWorld(std::string str) : m_str(str){};
    HelloWorld(const HelloWorld& v) = delete;
    HelloWorld(HelloWorld&& v) = default;
    ~HelloWorld() = default;

    void greetings(int ii) const {
        for (int jj = 0; jj < 5; jj++) {
            {
                static std::mutex s_outputmx;
                std::lock_guard<std::mutex> lk(s_outputmx);
                std::cout << this->m_str << " " << ii << std::endl;
            }
            boost::this_thread::sleep_for(boost::chrono::seconds(1));
        }
    }
};

int main()
{
    ThreadPool tp(8);

    for (int ii = 0; ii < 5; ii++) {
        HelloWorld hw("Hola mundo");
        tp.post(&HelloWorld::greetings, std::move(hw), ii);
    }

    tp.wait();
}

打印

Hola mundo 0
Hola mundo 2
Hola mundo 3
Hola mundo 1
Hola mundo 4
Hola mundo 0
Hola mundo 1
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 0
Hola mundo 1
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 0
Hola mundo 4
Hola mundo 2
Hola mundo 3
Hola mundo 1
Hola mundo 0
Hola mundo 4
Hola mundo 2
Hola mundo 1
Hola mundo 3

奖励:丢弃信号量

丢弃信号量并实际使用 work:

class ThreadPool {
    boost::asio::io_service m_io_service;
    std::unique_ptr<boost::asio::io_service::work> m_work;
    boost::thread_group m_threads;

  public:
    ThreadPool(size_t n)
        : m_work(std::make_unique<boost::asio::io_service::work>(m_io_service))
    {
        while (n--) {
            m_threads.create_thread([this] { m_io_service.run(); });
        }
    }

    ~ThreadPool() { wait(); }

    void wait() {
        m_work.reset();
        m_threads.join_all();
    }

    template <class F, class... Args> void post(F&& f, Args&&... args) {
        m_io_service.post(
            [f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)] {
                std::apply(f, args); 
            });
    }
};

这是 28 行代码,而原始代码为 90 行。它实际上做了 更多的事情

也看到了Live On Coliru

还剩下什么?

我们没有正确处理来自 io_service::run 的异常(参见 Should the exception thrown by boost::asio::io_service::run() be caught?

此外,如果您有“最近”的 Boost,您可以享受到 work 的改进界面(make_work_guard.reset(),因此您不需要 unique_ptr ), 和一个现成的 thread_pool (所以你不再需要......基本上任何东西):

Live On Coliru

#include <boost/asio.hpp>
#include <mutex>
#include <iostream>
static std::mutex s_outputmx;
using namespace std::chrono_literals;

struct HelloWorld {
    std::string const m_str;
    void greetings(int ii) const;
};

int main() {
    boost::asio::thread_pool tp(8);

    for (int ii = 0; ii < 5; ii++)
        //post(tp, [hw=HelloWorld{"Hola mundo"}, ii] { hw.greetings(ii); });
        post(tp, std::bind(&HelloWorld::greetings, HelloWorld{"Hola mundo"}, ii));

    tp.join();
}

void HelloWorld::greetings(int ii) const {
    for (int jj = 0; jj < 5; jj++) {
        std::this_thread::sleep_for(1s);

        std::lock_guard<std::mutex> lk(s_outputmx);
        std::cout << m_str << " " << ii << std::endl;
    }
}