std::async 指定线程的模拟
std::async analogue for specified thread
我需要处理多个对象,其中每个操作可能需要很长时间。
无法将处理放在我启动它的 GUI(主)线程中。
我需要与一些对象on asynchronous operations, something similar to std::async
with std::future
or QtConcurrent::run()
in my main framework (Qt 5), with QFuture
等进行所有通信,但它不提供线程选择。我需要始终只在一个额外的线程中处理选定的对象(对象 == 设备),
因为:
- 我需要做一个通用的解决方案,不想让每个 class 线程安全
- 例如,甚至如果为QSerialPort做一个线程安全的容器,Serial port in Qt不能在多个线程中访问:
Note: The serial port is always opened with exclusive access (that is, no other process or thread can access an already opened serial port).
- 通常与设备的通信包括发送命令和接收应答。我想准确地在发送请求的地方处理每个答案,并且不想使用仅事件驱动的逻辑。
所以,我的问题。
该功能如何实现?
MyFuture<T> fut = myAsyncStart(func, &specificLiveThread);
一个live thread必须能传多次
是Qt。它的 signal-slot 机制是 thread-aware。在您的辅助 (non-GUI) 线程上,创建一个带有 execute
插槽的 QObject
派生的 class。连接到该插槽的信号会将事件编组到该线程。
请注意,这个 QObject
不能是 GUI 对象的子对象,因为子对象需要存在于它们的父线程中,而这个对象显然不存在于 GUI 线程中。
您可以使用现有的 std::promise
逻辑处理结果,就像 std::future
那样。
让我在不引用 Qt 库的情况下回答,因为我不知道它的线程 API。
在 C++11 标准库中,没有直接的方法来重用创建的线程。线程执行单个功能,只能连接或分离。但是,您可以使用 producer-consumer 模式来实现它。消费者线程需要执行由生产者线程放入队列中的任务(例如表示为 std::function
对象)。因此,如果我是正确的,您需要一个单线程线程池。
我可以推荐我的线程池 C++14 实现作为任务队列。它不常用(目前!)但它包含单元测试并多次使用线程清理器进行检查。文档很少,但请随时在 github 个问题中提出任何问题!
库存储库:https://github.com/Ravirael/concurrentpp
您的用例:
#include <task_queues.hpp>
int main() {
// The single threaded task queue object - creates one additional thread.
concurrent::n_threaded_fifo_task_queue queue(1);
// Add tasks to queue, task is executed in created thread.
std::future<int> future_result = queue.push_with_result([] { return 4; });
// Blocks until task is completed.
int result = future_result.get();
// Executes task on the same thread as before.
std::future<int> second_future_result = queue.push_with_result([] { return 4; });
}
如果您想遵循主动对象方法,这里有一个使用模板的示例:
WorkPackage 及其接口仅用于在向量中存储不同 return 类型的函数(请参阅稍后的 ActiveObject::async
成员函数):
class IWorkPackage {
public:
virtual void execute() = 0;
virtual ~IWorkPackage() {
}
};
template <typename R>
class WorkPackage : public IWorkPackage{
private:
std::packaged_task<R()> task;
public:
WorkPackage(std::packaged_task<R()> t) : task(std::move(t)) {
}
void execute() final {
task();
}
std::future<R> get_future() {
return task.get_future();
}
};
这是 ActiveObject class,它需要您的设备作为模板。此外,它还有一个向量来存储设备的方法请求和一个线程来一个接一个地执行这些方法。最后使用 async 函数向设备请求方法调用:
template <typename Device>
class ActiveObject {
private:
Device servant;
std::thread worker;
std::vector<std::unique_ptr<IWorkPackage>> work_queue;
std::atomic<bool> done;
std::mutex queue_mutex;
std::condition_variable cv;
void worker_thread() {
while(done.load() == false) {
std::unique_ptr<IWorkPackage> wp;
{
std::unique_lock<std::mutex> lck {queue_mutex};
cv.wait(lck, [this] {return !work_queue.empty() || done.load() == true;});
if(done.load() == true) continue;
wp = std::move(work_queue.back());
work_queue.pop_back();
}
if(wp) wp->execute();
}
}
public:
ActiveObject(): done(false) {
worker = std::thread {&ActiveObject::worker_thread, this};
}
~ActiveObject() {
{
std::unique_lock<std::mutex> lck{queue_mutex};
done.store(true);
}
cv.notify_one();
worker.join();
}
template<typename R, typename ...Args, typename ...Params>
std::future<R> async(R (Device::*function)(Params...), Args... args) {
std::unique_ptr<WorkPackage<R>> wp {new WorkPackage<R> {std::packaged_task<R()> { std::bind(function, &servant, args...) }}};
std::future<R> fut = wp->get_future();
{
std::unique_lock<std::mutex> lck{queue_mutex};
work_queue.push_back(std::move(wp));
}
cv.notify_one();
return fut;
}
// In case you want to call some functions directly on the device
Device* operator->() {
return &servant;
}
};
您可以按如下方式使用:
ActiveObject<QSerialPort> ao_serial_port;
// direct call:
ao_serial_port->setReadBufferSize(size);
//async call:
std::future<void> buf_future = ao_serial_port.async(&QSerialPort::setReadBufferSize, size);
std::future<Parity> parity_future = ao_serial_port.async(&QSerialPort::parity);
// Maybe do some other work here
buf_future.get(); // wait until calculations are ready
Parity p = parity_future.get(); // blocks if result not ready yet, i.e. if method has not finished execution yet
编辑以回答评论中的问题:AO 主要是多个 reader/writer 的并发模式。一如既往,它的使用取决于情况。因此,这种模式通常用于分布式 systems/network 应用程序,例如,当多个客户端从服务器请求服务时。客户端受益于 AO 模式,因为它们在等待服务器应答时不会被阻塞。
这种模式在网络应用程序以外的领域中不经常使用的原因之一 可能 是线程开销。为每个活动对象创建一个线程会导致大量线程,如果 CPU 数量较少且同时使用多个活动对象,则会导致线程争用。
我只能猜测为什么人们认为这是一个奇怪的问题:正如您已经发现的那样,它确实需要一些额外的编程。也许这就是原因,但我不确定。
但我认为该模式出于其他原因和用途也非常有用。至于你的例子,主线程(以及其他后台线程)需要来自单例的服务,例如一些设备或硬件接口,它们的数量很少,计算速度慢并且需要并发访问,而不是阻塞等待结果。
我需要处理多个对象,其中每个操作可能需要很长时间。
无法将处理放在我启动它的 GUI(主)线程中。
我需要与一些对象on asynchronous operations, something similar to std::async
with std::future
or QtConcurrent::run()
in my main framework (Qt 5), with QFuture
等进行所有通信,但它不提供线程选择。我需要始终只在一个额外的线程中处理选定的对象(对象 == 设备),
因为:
- 我需要做一个通用的解决方案,不想让每个 class 线程安全
- 例如,甚至如果为QSerialPort做一个线程安全的容器,Serial port in Qt不能在多个线程中访问:
Note: The serial port is always opened with exclusive access (that is, no other process or thread can access an already opened serial port).
- 通常与设备的通信包括发送命令和接收应答。我想准确地在发送请求的地方处理每个答案,并且不想使用仅事件驱动的逻辑。
所以,我的问题。
该功能如何实现?
MyFuture<T> fut = myAsyncStart(func, &specificLiveThread);
一个live thread必须能传多次
是Qt。它的 signal-slot 机制是 thread-aware。在您的辅助 (non-GUI) 线程上,创建一个带有 execute
插槽的 QObject
派生的 class。连接到该插槽的信号会将事件编组到该线程。
请注意,这个 QObject
不能是 GUI 对象的子对象,因为子对象需要存在于它们的父线程中,而这个对象显然不存在于 GUI 线程中。
您可以使用现有的 std::promise
逻辑处理结果,就像 std::future
那样。
让我在不引用 Qt 库的情况下回答,因为我不知道它的线程 API。
在 C++11 标准库中,没有直接的方法来重用创建的线程。线程执行单个功能,只能连接或分离。但是,您可以使用 producer-consumer 模式来实现它。消费者线程需要执行由生产者线程放入队列中的任务(例如表示为 std::function
对象)。因此,如果我是正确的,您需要一个单线程线程池。
我可以推荐我的线程池 C++14 实现作为任务队列。它不常用(目前!)但它包含单元测试并多次使用线程清理器进行检查。文档很少,但请随时在 github 个问题中提出任何问题!
库存储库:https://github.com/Ravirael/concurrentpp
您的用例:
#include <task_queues.hpp>
int main() {
// The single threaded task queue object - creates one additional thread.
concurrent::n_threaded_fifo_task_queue queue(1);
// Add tasks to queue, task is executed in created thread.
std::future<int> future_result = queue.push_with_result([] { return 4; });
// Blocks until task is completed.
int result = future_result.get();
// Executes task on the same thread as before.
std::future<int> second_future_result = queue.push_with_result([] { return 4; });
}
如果您想遵循主动对象方法,这里有一个使用模板的示例:
WorkPackage 及其接口仅用于在向量中存储不同 return 类型的函数(请参阅稍后的 ActiveObject::async
成员函数):
class IWorkPackage {
public:
virtual void execute() = 0;
virtual ~IWorkPackage() {
}
};
template <typename R>
class WorkPackage : public IWorkPackage{
private:
std::packaged_task<R()> task;
public:
WorkPackage(std::packaged_task<R()> t) : task(std::move(t)) {
}
void execute() final {
task();
}
std::future<R> get_future() {
return task.get_future();
}
};
这是 ActiveObject class,它需要您的设备作为模板。此外,它还有一个向量来存储设备的方法请求和一个线程来一个接一个地执行这些方法。最后使用 async 函数向设备请求方法调用:
template <typename Device>
class ActiveObject {
private:
Device servant;
std::thread worker;
std::vector<std::unique_ptr<IWorkPackage>> work_queue;
std::atomic<bool> done;
std::mutex queue_mutex;
std::condition_variable cv;
void worker_thread() {
while(done.load() == false) {
std::unique_ptr<IWorkPackage> wp;
{
std::unique_lock<std::mutex> lck {queue_mutex};
cv.wait(lck, [this] {return !work_queue.empty() || done.load() == true;});
if(done.load() == true) continue;
wp = std::move(work_queue.back());
work_queue.pop_back();
}
if(wp) wp->execute();
}
}
public:
ActiveObject(): done(false) {
worker = std::thread {&ActiveObject::worker_thread, this};
}
~ActiveObject() {
{
std::unique_lock<std::mutex> lck{queue_mutex};
done.store(true);
}
cv.notify_one();
worker.join();
}
template<typename R, typename ...Args, typename ...Params>
std::future<R> async(R (Device::*function)(Params...), Args... args) {
std::unique_ptr<WorkPackage<R>> wp {new WorkPackage<R> {std::packaged_task<R()> { std::bind(function, &servant, args...) }}};
std::future<R> fut = wp->get_future();
{
std::unique_lock<std::mutex> lck{queue_mutex};
work_queue.push_back(std::move(wp));
}
cv.notify_one();
return fut;
}
// In case you want to call some functions directly on the device
Device* operator->() {
return &servant;
}
};
您可以按如下方式使用:
ActiveObject<QSerialPort> ao_serial_port;
// direct call:
ao_serial_port->setReadBufferSize(size);
//async call:
std::future<void> buf_future = ao_serial_port.async(&QSerialPort::setReadBufferSize, size);
std::future<Parity> parity_future = ao_serial_port.async(&QSerialPort::parity);
// Maybe do some other work here
buf_future.get(); // wait until calculations are ready
Parity p = parity_future.get(); // blocks if result not ready yet, i.e. if method has not finished execution yet
编辑以回答评论中的问题:AO 主要是多个 reader/writer 的并发模式。一如既往,它的使用取决于情况。因此,这种模式通常用于分布式 systems/network 应用程序,例如,当多个客户端从服务器请求服务时。客户端受益于 AO 模式,因为它们在等待服务器应答时不会被阻塞。
这种模式在网络应用程序以外的领域中不经常使用的原因之一 可能 是线程开销。为每个活动对象创建一个线程会导致大量线程,如果 CPU 数量较少且同时使用多个活动对象,则会导致线程争用。
我只能猜测为什么人们认为这是一个奇怪的问题:正如您已经发现的那样,它确实需要一些额外的编程。也许这就是原因,但我不确定。
但我认为该模式出于其他原因和用途也非常有用。至于你的例子,主线程(以及其他后台线程)需要来自单例的服务,例如一些设备或硬件接口,它们的数量很少,计算速度慢并且需要并发访问,而不是阻塞等待结果。