std::packaged_task 和 std::placeholders
std::packaged_task with std::placeholders
主要编辑以简化代码(并解决)
我希望能够制作一个具有自由未绑定参数的打包任务,然后我将在打包任务的调用时添加它。
在这种情况下,我希望函数(size_t
类型)的第一个参数未绑定。
这是一个最小的工作示例(这是解决方案):
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <cstdlib>
#include <cstdio>
//REV: I'm trying to "trick" this into for double testfunct( size_t arg1, double arg2), take enqueue( testfunct, 1.0 ), and then internally, execute
// return testfunct( internal_size_t, 1.0 )
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(size_t, Args...)>::type>
{
using return_type = typename std::result_of<F(size_t, Args...)>::type;
//REV: this is where the error was, I was being stupid and thinking this task_contents which would be pushed to the queue should be same (return?) type as original function? Changed to auto and everything worked... (taking into account Jans's result_type(size_t) advice into account.
//std::function<void(size_t)> task_contents = std::bind( std::forward<F>(f), std::placeholders::_1, std::forward<Args>(args)... );
auto task_contents = std::bind( std::forward<F>(f), std::placeholders::_1, std::forward<Args>(args)... );
std::packaged_task<return_type(size_t)> rawtask(
task_contents );
std::future<return_type> res = rawtask.get_future();
size_t arbitrary_i = 10;
rawtask(arbitrary_i);
return res;
}
double testfunct( size_t threadidx, double& arg1 )
{
fprintf(stdout, "Double %lf Executing on thread %ld\n", arg1, threadidx );
std::this_thread::sleep_for( std::chrono::milliseconds(1000) );
return 10; //true;
}
int main()
{
std::vector<std::future<double>> myfutures;
for(size_t x=0; x<100; ++x)
{
double a=x*10;
myfutures.push_back(
enqueue( testfunct, std::ref(a) )
);
}
for(size_t x=0; x<100; ++x)
{
double r = myfutures[x].get();
fprintf(stdout, "Got %ld: %f\n", x, r );
}
}
主要问题在ThreadPool::enqueue
:
std::function<void(size_t)> task1 = std::bind( std::forward<F>(f), std::placeholders::_1, std::forward<Args>(args)... );
在这里,task1
的类型是 std::function<void(std::size_t)>
,但是当用 funct
求值时 std::bind
的结果可以转换为 std::function<bool(std::size_t)>
,即使正如@T.C 所指出的,您可以将 bind
的结果分配给 task1
,为了将 task1
传递给 std::make_shared
,您需要遵守return_type
你有。
将以上内容改为:
std::function<return_type(size_t)> task1 = std::bind( std::forward<F>(f), std::placeholders::_1, std::forward<Args>(args)... );
现在同样适用于:
auto task = std::make_shared< std::packaged_task<return_type()> >( task1 );
但在这种情况下是缺少参数类型。将其更改为:
auto task = std::make_shared< std::packaged_task<return_type(std::size_t)> >( task1 );
ThreadPool::tasks
存储类型为 std::function<void(std::size_t)>
的函数对象,但您正在存储不接收任何参数的 lambda。将 tasks.emplace(...)
更改为:
tasks.emplace([task](std::size_t a){ (*task)(a); });
代码格式不是很好,但是有解决办法。
首先,您应该将结果包装在 lambda 创建中,而不是传递可以 return 任何东西的函数。但是如果你想在一个任务上使用共享指针,这是可行的。
原型中:
std::future<void> enqueue(std::function<void(size_t)> f);
using Task = std::function<void(size_t)>;
// the task queue
std::queue<Task> tasks;
std::optional<Task> pop_one();
实施变为:
ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this,i]
{
for(;;)
{
auto task = pop_one();
if(task)
{
(*task)(i);
}
else break;
}
}
);
}
std::optional<ThreadPool::Task> ThreadPool::pop_one()
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
{
return std::optional<Task>();
}
auto task = std::move(this->tasks.front()); //REV: this moves into my thread the front of the tasks queue.
this->tasks.pop();
return task;
}
template<typename T>
std::future<T> ThreadPool::enqueue(std::function<T(size_t)> fun)
{
auto task = std::make_shared< std::packaged_task<T(size_t)> >([=](size_t size){return fun(size);});
auto res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
{
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([=](size_t size){(*task)(size);});
}
condition.notify_one();
return res;
}
现在你可以拥有你的主菜了:
int main()
{
size_t nthreads=3;
ThreadPool tp(nthreads);
std::vector<std::future<bool>> myfutures;
for(size_t x=0; x<100; ++x)
{
myfutures.push_back(
tp.enqueue<bool>([=](size_t threadidx) {return funct(threadidx, (double)x * 10.0);}));
}
for(size_t x=0; x<100; ++x)
{
bool r = myfutures[x].get();
std::cout << "Got " << r << "\n";
}
}
包装 lambda 时现在有一个明确的 return 类型,因为 return 类型是模板化的。
主要编辑以简化代码(并解决)
我希望能够制作一个具有自由未绑定参数的打包任务,然后我将在打包任务的调用时添加它。
在这种情况下,我希望函数(size_t
类型)的第一个参数未绑定。
这是一个最小的工作示例(这是解决方案):
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <cstdlib>
#include <cstdio>
//REV: I'm trying to "trick" this into for double testfunct( size_t arg1, double arg2), take enqueue( testfunct, 1.0 ), and then internally, execute
// return testfunct( internal_size_t, 1.0 )
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(size_t, Args...)>::type>
{
using return_type = typename std::result_of<F(size_t, Args...)>::type;
//REV: this is where the error was, I was being stupid and thinking this task_contents which would be pushed to the queue should be same (return?) type as original function? Changed to auto and everything worked... (taking into account Jans's result_type(size_t) advice into account.
//std::function<void(size_t)> task_contents = std::bind( std::forward<F>(f), std::placeholders::_1, std::forward<Args>(args)... );
auto task_contents = std::bind( std::forward<F>(f), std::placeholders::_1, std::forward<Args>(args)... );
std::packaged_task<return_type(size_t)> rawtask(
task_contents );
std::future<return_type> res = rawtask.get_future();
size_t arbitrary_i = 10;
rawtask(arbitrary_i);
return res;
}
double testfunct( size_t threadidx, double& arg1 )
{
fprintf(stdout, "Double %lf Executing on thread %ld\n", arg1, threadidx );
std::this_thread::sleep_for( std::chrono::milliseconds(1000) );
return 10; //true;
}
int main()
{
std::vector<std::future<double>> myfutures;
for(size_t x=0; x<100; ++x)
{
double a=x*10;
myfutures.push_back(
enqueue( testfunct, std::ref(a) )
);
}
for(size_t x=0; x<100; ++x)
{
double r = myfutures[x].get();
fprintf(stdout, "Got %ld: %f\n", x, r );
}
}
主要问题在ThreadPool::enqueue
:
std::function<void(size_t)> task1 = std::bind( std::forward<F>(f), std::placeholders::_1, std::forward<Args>(args)... );
在这里,task1
的类型是 std::function<void(std::size_t)>
,但是当用 funct
求值时 std::bind
的结果可以转换为 std::function<bool(std::size_t)>
,即使正如@T.C 所指出的,您可以将 bind
的结果分配给 task1
,为了将 task1
传递给 std::make_shared
,您需要遵守return_type
你有。
将以上内容改为:
std::function<return_type(size_t)> task1 = std::bind( std::forward<F>(f), std::placeholders::_1, std::forward<Args>(args)... );
现在同样适用于:
auto task = std::make_shared< std::packaged_task<return_type()> >( task1 );
但在这种情况下是缺少参数类型。将其更改为:
auto task = std::make_shared< std::packaged_task<return_type(std::size_t)> >( task1 );
ThreadPool::tasks
存储类型为 std::function<void(std::size_t)>
的函数对象,但您正在存储不接收任何参数的 lambda。将 tasks.emplace(...)
更改为:
tasks.emplace([task](std::size_t a){ (*task)(a); });
代码格式不是很好,但是有解决办法。
首先,您应该将结果包装在 lambda 创建中,而不是传递可以 return 任何东西的函数。但是如果你想在一个任务上使用共享指针,这是可行的。
原型中:
std::future<void> enqueue(std::function<void(size_t)> f);
using Task = std::function<void(size_t)>;
// the task queue
std::queue<Task> tasks;
std::optional<Task> pop_one();
实施变为:
ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this,i]
{
for(;;)
{
auto task = pop_one();
if(task)
{
(*task)(i);
}
else break;
}
}
);
}
std::optional<ThreadPool::Task> ThreadPool::pop_one()
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
{
return std::optional<Task>();
}
auto task = std::move(this->tasks.front()); //REV: this moves into my thread the front of the tasks queue.
this->tasks.pop();
return task;
}
template<typename T>
std::future<T> ThreadPool::enqueue(std::function<T(size_t)> fun)
{
auto task = std::make_shared< std::packaged_task<T(size_t)> >([=](size_t size){return fun(size);});
auto res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
{
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([=](size_t size){(*task)(size);});
}
condition.notify_one();
return res;
}
现在你可以拥有你的主菜了:
int main()
{
size_t nthreads=3;
ThreadPool tp(nthreads);
std::vector<std::future<bool>> myfutures;
for(size_t x=0; x<100; ++x)
{
myfutures.push_back(
tp.enqueue<bool>([=](size_t threadidx) {return funct(threadidx, (double)x * 10.0);}));
}
for(size_t x=0; x<100; ++x)
{
bool r = myfutures[x].get();
std::cout << "Got " << r << "\n";
}
}
包装 lambda 时现在有一个明确的 return 类型,因为 return 类型是模板化的。