正确使用 std::promise 和 std::future,段错误
Proper usage of std::promise and std::future, segfaults
我一直在小线程池实现上碰碰运气。
然而,在概念化和实施之后,我遇到了瓶颈。
我已经确认工作线程可以正确启动和休眠,而且它们可以正确拾取和执行存储的任务。
但是,我的程序出现段错误 - 我很确定它在 promise.set_value
.
我不确定如何提供一个完整的、可验证的示例(假设我几乎无法上传整个代码)但我会包括这些片段
我相信与这个问题有关。
首先,工人是这样创建的:
worker = [this](){
while(true)
{
std::unique_lock<std::mutex> lock(mStatusMutex); //CV for status updates
mCV.wait(lock);
if(mStatus != Running) //If threadpool status does not imply running
break; //Break out of loop, ending thread in the process
else //If threadpool is in running state
{
lock.unlock(); //Unlock state
while(true) //Loop until no tasks are left
{
mTasksMutex.lock(); //Lock task queue
if(mTasks.empty()) //IF no tasks left, break out of loop and return to waiting
{
mTasksMutex.unlock();
break;
}
else //Else, retrieve a task, unlock the task queue and execute the task
{
std::function<void()> task = mTasks.front();
mTasks.pop();
mTasksMutex.unlock();
task(); //Execute task
}
}
}
}
};
然后像这样启动并存储到std::vector<std::thread>
中:
std::thread tWorker(worker);
mWorkers.push_back(std::move(tWorker));
现在,我认为下面的棘手部分是 adding/executing 任务到任务队列的时间,即 std::queue<std::function<void()>>
。
以下两个函数与此相关:
template<typename RT>
inline std::future<RT> queueTask(std::function<RT()> _task, bool _execute = false)
{
std::promise<RT> promise;
std::function<void()> func([&_task, &promise]() -> RT {
RT val = _task();
promise.set_value(val);
});
mTasksMutex.lock();
mTasks.emplace(func);
mTasksMutex.unlock();
if(_execute) flush();
return promise.get_future();
}
inline void flush()
{
mCV.notify_all();
}
这种方法有什么主要错误吗?
对于任何认为这是一个糟糕问题的人,请随时告诉我如何改进它。
完整代码托管在 my github repo.
主要问题是 promise 已经死了。当 queueTask
完成时,promise
被销毁,任务现在只有一个悬空引用。该任务必须共享 promise
的所有权,以便它能够存活足够长的时间来完成它。
基础 std::function
对象 _task
也是如此,因为您是通过引用捕获它。
您正在使用 std::function
,它需要可复制的对象,因此... shared_ptr
:
template<typename RT>
inline std::future<RT> queueTask(std::function<RT()> _task, bool _execute = false)
{
auto promise = std::make_shared<std::promise<RT>>();
std::function<void()> func([promise, task=std::move(_task)]{
RT val = _task();
promise->set_value(val);
});
{
std::lock_guard<std::mutex> lk(mTasksMutex); // NB: no manual lock()/unlock()!!
mTasks.emplace(func);
}
if(_execute) flush();
return promise->get_future();
}
请考虑 std::packaged_task
。
我一直在小线程池实现上碰碰运气。
然而,在概念化和实施之后,我遇到了瓶颈。
我已经确认工作线程可以正确启动和休眠,而且它们可以正确拾取和执行存储的任务。
但是,我的程序出现段错误 - 我很确定它在 promise.set_value
.
我不确定如何提供一个完整的、可验证的示例(假设我几乎无法上传整个代码)但我会包括这些片段 我相信与这个问题有关。 首先,工人是这样创建的:
worker = [this](){
while(true)
{
std::unique_lock<std::mutex> lock(mStatusMutex); //CV for status updates
mCV.wait(lock);
if(mStatus != Running) //If threadpool status does not imply running
break; //Break out of loop, ending thread in the process
else //If threadpool is in running state
{
lock.unlock(); //Unlock state
while(true) //Loop until no tasks are left
{
mTasksMutex.lock(); //Lock task queue
if(mTasks.empty()) //IF no tasks left, break out of loop and return to waiting
{
mTasksMutex.unlock();
break;
}
else //Else, retrieve a task, unlock the task queue and execute the task
{
std::function<void()> task = mTasks.front();
mTasks.pop();
mTasksMutex.unlock();
task(); //Execute task
}
}
}
}
};
然后像这样启动并存储到std::vector<std::thread>
中:
std::thread tWorker(worker);
mWorkers.push_back(std::move(tWorker));
现在,我认为下面的棘手部分是 adding/executing 任务到任务队列的时间,即 std::queue<std::function<void()>>
。
以下两个函数与此相关:
template<typename RT>
inline std::future<RT> queueTask(std::function<RT()> _task, bool _execute = false)
{
std::promise<RT> promise;
std::function<void()> func([&_task, &promise]() -> RT {
RT val = _task();
promise.set_value(val);
});
mTasksMutex.lock();
mTasks.emplace(func);
mTasksMutex.unlock();
if(_execute) flush();
return promise.get_future();
}
inline void flush()
{
mCV.notify_all();
}
这种方法有什么主要错误吗? 对于任何认为这是一个糟糕问题的人,请随时告诉我如何改进它。 完整代码托管在 my github repo.
主要问题是 promise 已经死了。当 queueTask
完成时,promise
被销毁,任务现在只有一个悬空引用。该任务必须共享 promise
的所有权,以便它能够存活足够长的时间来完成它。
基础 std::function
对象 _task
也是如此,因为您是通过引用捕获它。
您正在使用 std::function
,它需要可复制的对象,因此... shared_ptr
:
template<typename RT>
inline std::future<RT> queueTask(std::function<RT()> _task, bool _execute = false)
{
auto promise = std::make_shared<std::promise<RT>>();
std::function<void()> func([promise, task=std::move(_task)]{
RT val = _task();
promise->set_value(val);
});
{
std::lock_guard<std::mutex> lk(mTasksMutex); // NB: no manual lock()/unlock()!!
mTasks.emplace(func);
}
if(_execute) flush();
return promise->get_future();
}
请考虑 std::packaged_task
。