C++串行线程执行器
C++ serial thread executor
来自 Java 环境(特别是 Android)在不阻塞主线程的情况下,我在新线程中执行某些代码时从未遇到过任何问题。
因为我现在必须使用 C++,所以我偶然发现了以下问题。
客户端代码通过 JNI 执行我的本机 (C++) 代码:
JNIEXPORT jbyteArray JNICALL
Java_com_native_project_NativeInterface_processData(JNIEnv *env, jobject instance, jbyteArray inputData_) {
vector<unsigned char> inputData = jbyteArrayToBytes(env, inputData_);
const vector<unsigned char> &result = getDataProcessor(env, instance).processData(inputData);
return bytesTojbyteArray(env, result);
}
DataProcessor getDataProcessor(JNIEnv *env, jobject instance) {
return DataProcessor(env, instance);
}
然后在我的 DataProcessor
中我想做两件事:
- 处理数据并return尽快
- 将数据写入 activity 日志(例如数据库)而不延迟响应(因此首先 return 响应然后记录数据)
示例代码:
class BasicAsync {
private:
void logToDB(const vector<unsigned char> &inputData) {
// connect to DB and write data to it
}
vector<unsigned char> compute(const vector<unsigned char> &inputData) {
vector<unsigned char> result = vector<unsigned char>();
// rocket-science computation in here
return result;
}
public:
vector<unsigned char> processData(const vector<unsigned char> &inputData) {
// perform data computation and produce output
vector<unsigned char> result = compute(inputData);
// create a thread that writes the data to activity log without delaying the response return
logToDB(inputData);
//return result while data is written to activity log
return result;
}
}
我主要担心的是:
- 在 C++ 中是否可行(我使用的是 C++ 11)?
- 如果将数据写入数据库需要一段时间,那段时间
DataProcessor
对象会发生什么情况(因为它应该在 return 通过 JNI 响应后被销毁,因为它的生命周期结束- 也许我在这里遗漏了什么)?
- 是否有任何串行线程执行器,以便我可以将一些东西写入数据库(它们将被放入 FIFO 队列并在同一线程中按顺序保存)?
在 2011 年之前,人们必须直接使用 pthreads 等原生 API 或部署第三方包装器库,例如来自 boost 的库,但自 2011 年以来,C++ 提供了一个相当丰富的标准化 thread 接口。
也许你先自己看看,试试看,然后在 post 中添加更具体的问题;然后我会相应地扩展这个答案。
- 是
- 考虑 Active Object Pattern 的一个变体来实现串行执行队列来完成必须串行化的长 运行 工作:
#include <thread>
#include <mutex>
#include <functional>
#include <queue>
class SerialExecutionQueue
{
public:
typedef std::function<void()> QueueItem;
SerialExecutionQueue() :
m_shouldQuit(false),
m_executor([this]()
{
executor();
})
{
}
void enqueueWork(QueueItem item)
{
{
std::lock_guard<std::mutex> l(m_mutex);
m_queue.push(item);
}
m_cv.notify_all();
}
void executor()
{
while (!m_shouldQuit)
{
std::unique_lock<std::mutex> lock(m_mutex);
while (m_queue.size())
{
auto item = m_queue.front();
m_queue.pop();
m_mutex.unlock();
item();
m_mutex.lock();
}
m_cv.wait(lock);
}
}
private:
bool m_shouldQuit;
std::condition_variable m_cv;
std::mutex m_mutex;
std::queue<QueueItem> m_queue;
std::thread m_executor;
};
int main(int argc, const char * argv[])
{
SerialExecutionQueue queue;
queue.enqueueWork([]()
{
std::cout << "Did some work 1" <<std::endl;
});
queue.enqueueWork([]()
{
std::cout << "Did some work 2" <<std::endl;
});
sleep(1);
queue.enqueueWork([]()
{
std::cout << "Did some work 3" <<std::endl;
});
sleep(10);
return 0;
}
输出:
Did some work 1
Did some work 2
Did some work 3
来自 Java 环境(特别是 Android)在不阻塞主线程的情况下,我在新线程中执行某些代码时从未遇到过任何问题。 因为我现在必须使用 C++,所以我偶然发现了以下问题。
客户端代码通过 JNI 执行我的本机 (C++) 代码:
JNIEXPORT jbyteArray JNICALL
Java_com_native_project_NativeInterface_processData(JNIEnv *env, jobject instance, jbyteArray inputData_) {
vector<unsigned char> inputData = jbyteArrayToBytes(env, inputData_);
const vector<unsigned char> &result = getDataProcessor(env, instance).processData(inputData);
return bytesTojbyteArray(env, result);
}
DataProcessor getDataProcessor(JNIEnv *env, jobject instance) {
return DataProcessor(env, instance);
}
然后在我的 DataProcessor
中我想做两件事:
- 处理数据并return尽快
- 将数据写入 activity 日志(例如数据库)而不延迟响应(因此首先 return 响应然后记录数据)
示例代码:
class BasicAsync {
private:
void logToDB(const vector<unsigned char> &inputData) {
// connect to DB and write data to it
}
vector<unsigned char> compute(const vector<unsigned char> &inputData) {
vector<unsigned char> result = vector<unsigned char>();
// rocket-science computation in here
return result;
}
public:
vector<unsigned char> processData(const vector<unsigned char> &inputData) {
// perform data computation and produce output
vector<unsigned char> result = compute(inputData);
// create a thread that writes the data to activity log without delaying the response return
logToDB(inputData);
//return result while data is written to activity log
return result;
}
}
我主要担心的是:
- 在 C++ 中是否可行(我使用的是 C++ 11)?
- 如果将数据写入数据库需要一段时间,那段时间
DataProcessor
对象会发生什么情况(因为它应该在 return 通过 JNI 响应后被销毁,因为它的生命周期结束- 也许我在这里遗漏了什么)? - 是否有任何串行线程执行器,以便我可以将一些东西写入数据库(它们将被放入 FIFO 队列并在同一线程中按顺序保存)?
在 2011 年之前,人们必须直接使用 pthreads 等原生 API 或部署第三方包装器库,例如来自 boost 的库,但自 2011 年以来,C++ 提供了一个相当丰富的标准化 thread 接口。
也许你先自己看看,试试看,然后在 post 中添加更具体的问题;然后我会相应地扩展这个答案。
- 是
- 考虑 Active Object Pattern 的一个变体来实现串行执行队列来完成必须串行化的长 运行 工作:
#include <thread>
#include <mutex>
#include <functional>
#include <queue>
class SerialExecutionQueue
{
public:
typedef std::function<void()> QueueItem;
SerialExecutionQueue() :
m_shouldQuit(false),
m_executor([this]()
{
executor();
})
{
}
void enqueueWork(QueueItem item)
{
{
std::lock_guard<std::mutex> l(m_mutex);
m_queue.push(item);
}
m_cv.notify_all();
}
void executor()
{
while (!m_shouldQuit)
{
std::unique_lock<std::mutex> lock(m_mutex);
while (m_queue.size())
{
auto item = m_queue.front();
m_queue.pop();
m_mutex.unlock();
item();
m_mutex.lock();
}
m_cv.wait(lock);
}
}
private:
bool m_shouldQuit;
std::condition_variable m_cv;
std::mutex m_mutex;
std::queue<QueueItem> m_queue;
std::thread m_executor;
};
int main(int argc, const char * argv[])
{
SerialExecutionQueue queue;
queue.enqueueWork([]()
{
std::cout << "Did some work 1" <<std::endl;
});
queue.enqueueWork([]()
{
std::cout << "Did some work 2" <<std::endl;
});
sleep(1);
queue.enqueueWork([]()
{
std::cout << "Did some work 3" <<std::endl;
});
sleep(10);
return 0;
}
输出:
Did some work 1
Did some work 2
Did some work 3