在线程之间传输函数调用
Transporting function calls between threads
我面临着一项非常具有挑战性的任务。这说起来容易,做起来难,我真的不知道该怎么做。也许有人知道更简单的方法:
我想将函数调用的参数(堆栈和寄存器)保存到堆中,然后在另一个线程中恢复这些参数。
假设函数如下:
int worker(int p1, int p2, ...) // variadic
{
return enq(); // will pack the parameters and return a Job structure
... // some heavy work that must be executed by another thread
}
我的起点是两个结构,第一个保存当前栈帧
struct StackFrame
{
struct StackFrame *next;
void *returnAddr;
};
而第二个保存的是保存的参数和工人的返回点
struct Job
{
void *registers[];
size_t regCount;
void *stackFrame;
size_t frameSize;
void *workerAddr;
};
现在,函数 enq()
它将打包 worker 参数,并使用线程池(已经工作)将 Job 入队。我预见到这样的事情:
Job* enq()
{
// get the caller stackFrame
register struct StackFrame *fp __builtin_frame_address(1);
// save the stack parameters of the caller to the heap,
Job *job = new Job;
job->frameSize = frame->next - frame;
job->frameContent = malloc (job->frameSize);
memcpy (job->stackContent, frame, job->frameSize );
job->workerAddr = frame.workerAddr // to where the worker Thread will jump
// !! I'm stuck here !!
// copy all the registers to memory (ideally only the used as parameters)
job.registers = ... // in i32 there was an instruction called PUSHA, but not on i64
return job // real, threadPool.push(job)
}
现在,在 worker 端,deq()
函数将执行与 enq()
相反的操作,如下所示:
void deq(Job *job)
{
// real, Job *job = threadPool.pop()
// restore the registers parameters
POPA(job->registers, job->regCount) // just like (i32 POPA)
// restore the stack frame.
push(job->frameContent, job->frameSize)
// execute the worker
call(job->workerAddr);
// mark the Job as done
}
在客户端,我想像这样调用这个函数:
Job* promise = worker(1, 2, "a variadic param");
wait(promise); // or wait(promise, callback)
enq()
函数必须从 worker 函数内部将调用者的参数打包到 Job 结构中。
wait()
函数不是一个真正的问题,这里是为了举例说明整个事情必须如何工作。
这就是我的全部。
你知道如何解决这些缺失的步骤,并帮助我更接近我的意图吗?或者更好的是,一种更简单、更高级的方法来做到这一点?
我在 Ubuntu 19 64 位上使用 GCC 9.2.1。
前段时间我遇到了类似的问题。在我的例子中,我需要调用的函数都是无效的。所以我所做的是将 std::bind
与 parameter pack templates and store the result in a std::vector<std::function<void()>>
. To return a specific type you could also return a std::future<T>
as you enqueue the "Jobs". To return a random type you might be able to return a std::future<std::any>
and leave it to the caller to cast it to the correct type. You can take a look at my implementation here.
一起使用
这是一个快速示例解决方案,它可以满足您的需求(请注意,它使用 Visual Studio 内联汇编语法,因此它可能与 GCC 的语法略有不同。它适用于具有任意数量参数的函数,但您如果你想的话,你必须找到一种从被调用函数中获取 return 类型的方法(如果它们都是 return 相同类型的值,那将是微不足道的,但天空是极限)。 =11=]
#include <iostream>
#include <vector>
#include <memory>
using namespace std;
template <class T>
vector<pair<size_t, void*>> PackParams(T param) {
vector<pair<size_t, void*>> ret;
T* paramMem = new T;
memcpy(paramMem, ¶m, sizeof(T));
ret.push_back(pair<size_t, void*>(sizeof(T), paramMem));
return ret;
}
template <class T, typename... Targs>
vector<pair<size_t, void*>> PackParams(T param, Targs... otherParams) {
vector<pair<size_t, void*>> ret;
T* paramMem = new T;
memcpy(paramMem, ¶m, sizeof(T));
ret.push_back(pair<size_t, void*>(sizeof(T), paramMem));
vector<pair<size_t, void*>> otherPack = PackParams(otherParams...);
for (int i = 0; i < otherPack.size(); ++i) {
ret.push_back(otherPack[i]);
}
return ret;
}
vector<pair<size_t, void*>> PackParams(void) {
vector<pair<size_t, void*>> ret;
return ret;
}
pair<size_t, void*> AlignParams(vector<pair<size_t, void*>> params) {
int totalSize = 0;
for (int i = 0; i < params.size(); ++i) {
totalSize += params[i].first;
}
char* paramBlock = new char[totalSize];
totalSize = 0;
for (int i = 0; i < params.size(); ++i) {
memcpy(¶mBlock[totalSize], params[i].second, params[i].first);
totalSize += params[i].first;
}
return pair<size_t, void*>(totalSize, paramBlock);
}
int test1(int a, int b) {
cout << a + b << endl;
return a + b;
}
void Call(void* pFunc, void* params, int paramSize) {
_asm {
mov edx, paramSize
mov ebx, params
xor ecx,ecx
loop1:
push dword ptr [ebx + ecx]
add ecx, 4
cmp ecx,paramSize
jl loop1
call pFunc
add esp, paramSize
nop
}
}
int main() {
vector<pair<size_t, void*>> r = PackParams(5, 6);
pair<size_t,void*> paramData = AlignParams(r);
Call(test1, paramData.second, paramData.first);
//system("pause");
return 0;
}
将paramData和函数指针传递给线程,它们就可以使用Call函数了。
警告:
您将必须实施正确的 AlignParams 函数以确保实现 4 字节、8 字节或任何对齐,具体取决于系统架构的要求。
此答案符合 x86 stdcall 调用约定。其他调用约定需要不同的汇编,尤其是 x64,它在某些情况下有一些根本性的偏离。
此外,为了快速为您提供解决方案,我省略了一些基本的内存处理(没有删除动态分配的内存等)。这只是一个示范性解决方案,旨在向您大体展示您需要做什么才能获得更强大的解决方案。
当然,您也可以将 Call、PackParams 和 AlignParams 组合成一个可变参数模板函数,以简化语法。
这是一个非常简单有效的解决方案,可以实现您的 enq 和 deq(为简洁起见,进行了一些小改动)。
我留下了一个早期的内联汇编解决方案,因为你提到了寄存器和堆栈,但这个解决方案不需要任何汇编,并且适用于具有任何数量和类型参数的任何函数。
为了快速回顾,"stack and registers" 存储在 ThreadFunction 实例中。您只需使用 ThreadFunction::Call 来调用带有存储参数的存储函数。 main 函数非常简单,仅对一些线程使用 enq 和 deq,但您可以使用 GetThreadInvokable 将函数及其参数打包到 ThreadFunction 对象中,然后您可以在需要时将其入队。
#include <iostream>
#include <vector>
#include <memory>
#include <thread>
#include <functional>
#include <mutex>
using namespace std;
struct ThreadFunction {
virtual void Call() = 0;
};
template <typename F, typename ... Args>
struct ThreadFunctionPacked : public ThreadFunction {
std::function<void(void)> m_lambda;
ThreadFunctionPacked(F pFunc, Args ... args) {
m_lambda = [pFunc, args...]() {
pFunc(args...);
};
}
virtual void Call() {
m_lambda();
}
};
template <typename F, typename ... Args>
ThreadFunction* GetThreadInvokable(F pFunc, Args... args) {
ThreadFunction* ret = (ThreadFunction*) new ThreadFunctionPacked<F, Args...> (pFunc, args...);
return ret;
}
struct Job {
ThreadFunction* m_funcAndArgs = NULL;
Job(ThreadFunction* p) {
m_funcAndArgs = p;
}
void Run() {
m_funcAndArgs->Call();
}
};
std::mutex mutexJobs;
std::vector<Job*> jobs;
std::mutex mutexConsole;
template <typename F, typename ... Args>
void enq(F pFunc, Args... args) {
std::lock_guard<std::mutex> lock(mutexJobs);
jobs.push_back(new Job(GetThreadInvokable(pFunc, args...)));
}
void deq() {
Job* job = NULL;
{
std::lock_guard<std::mutex> lock(mutexJobs);
if (jobs.empty()) {
return;
}
job = jobs[0];
jobs.erase(jobs.begin());
}
if (job != NULL) {
job->Run();
}
}
void testAdd(int a, int b) {
std::lock_guard<std::mutex> lock(mutexConsole);
cout << a + b << endl;
}
void testMinus(int a, int b) {
std::lock_guard<std::mutex> lock(mutexConsole);
cout << a - b << endl;
}
void testVoid() {
std::lock_guard<std::mutex> lock(mutexConsole);
cout << "Void function" << endl;
}
void testPrint(std::string str) {
std::lock_guard<std::mutex> lock(mutexConsole);
cout << str << endl;
}
void thread1Func() {
deq();
deq();
deq();
}
void thread2Func() {
deq();
deq();
deq();
}
int main() {
enq(testAdd, 5, 3);
enq(testAdd, 10, 50);
enq(testMinus, 7, 20);
enq(testVoid);
enq(testPrint, "Hello");
std::thread t1(thread1Func);
std::thread t2(thread2Func);
t1.join();
t2.join();
return 0;
}
如您所见,我使用了几个互斥锁来进行概念验证线程处理,但是您应该按照计划在解决方案中使用线程池来使其更加健壮,并且还要注意我没有实现任何析构函数或删除您当然应该做的动态内存(将指针包装在 std::unique_ptr 中就足够了)。
我会保留我之前的解决方案,以防有人想涉足寄存器和堆栈管理,但我认为您会对这个解决方案感到最满意。
我面临着一项非常具有挑战性的任务。这说起来容易,做起来难,我真的不知道该怎么做。也许有人知道更简单的方法:
我想将函数调用的参数(堆栈和寄存器)保存到堆中,然后在另一个线程中恢复这些参数。
假设函数如下:
int worker(int p1, int p2, ...) // variadic
{
return enq(); // will pack the parameters and return a Job structure
... // some heavy work that must be executed by another thread
}
我的起点是两个结构,第一个保存当前栈帧
struct StackFrame
{
struct StackFrame *next;
void *returnAddr;
};
而第二个保存的是保存的参数和工人的返回点
struct Job
{
void *registers[];
size_t regCount;
void *stackFrame;
size_t frameSize;
void *workerAddr;
};
现在,函数 enq()
它将打包 worker 参数,并使用线程池(已经工作)将 Job 入队。我预见到这样的事情:
Job* enq()
{
// get the caller stackFrame
register struct StackFrame *fp __builtin_frame_address(1);
// save the stack parameters of the caller to the heap,
Job *job = new Job;
job->frameSize = frame->next - frame;
job->frameContent = malloc (job->frameSize);
memcpy (job->stackContent, frame, job->frameSize );
job->workerAddr = frame.workerAddr // to where the worker Thread will jump
// !! I'm stuck here !!
// copy all the registers to memory (ideally only the used as parameters)
job.registers = ... // in i32 there was an instruction called PUSHA, but not on i64
return job // real, threadPool.push(job)
}
现在,在 worker 端,deq()
函数将执行与 enq()
相反的操作,如下所示:
void deq(Job *job)
{
// real, Job *job = threadPool.pop()
// restore the registers parameters
POPA(job->registers, job->regCount) // just like (i32 POPA)
// restore the stack frame.
push(job->frameContent, job->frameSize)
// execute the worker
call(job->workerAddr);
// mark the Job as done
}
在客户端,我想像这样调用这个函数:
Job* promise = worker(1, 2, "a variadic param");
wait(promise); // or wait(promise, callback)
enq()
函数必须从 worker 函数内部将调用者的参数打包到 Job 结构中。
wait()
函数不是一个真正的问题,这里是为了举例说明整个事情必须如何工作。
这就是我的全部。
你知道如何解决这些缺失的步骤,并帮助我更接近我的意图吗?或者更好的是,一种更简单、更高级的方法来做到这一点?
我在 Ubuntu 19 64 位上使用 GCC 9.2.1。
前段时间我遇到了类似的问题。在我的例子中,我需要调用的函数都是无效的。所以我所做的是将 std::bind
与 parameter pack templates and store the result in a std::vector<std::function<void()>>
. To return a specific type you could also return a std::future<T>
as you enqueue the "Jobs". To return a random type you might be able to return a std::future<std::any>
and leave it to the caller to cast it to the correct type. You can take a look at my implementation here.
这是一个快速示例解决方案,它可以满足您的需求(请注意,它使用 Visual Studio 内联汇编语法,因此它可能与 GCC 的语法略有不同。它适用于具有任意数量参数的函数,但您如果你想的话,你必须找到一种从被调用函数中获取 return 类型的方法(如果它们都是 return 相同类型的值,那将是微不足道的,但天空是极限)。 =11=]
#include <iostream>
#include <vector>
#include <memory>
using namespace std;
template <class T>
vector<pair<size_t, void*>> PackParams(T param) {
vector<pair<size_t, void*>> ret;
T* paramMem = new T;
memcpy(paramMem, ¶m, sizeof(T));
ret.push_back(pair<size_t, void*>(sizeof(T), paramMem));
return ret;
}
template <class T, typename... Targs>
vector<pair<size_t, void*>> PackParams(T param, Targs... otherParams) {
vector<pair<size_t, void*>> ret;
T* paramMem = new T;
memcpy(paramMem, ¶m, sizeof(T));
ret.push_back(pair<size_t, void*>(sizeof(T), paramMem));
vector<pair<size_t, void*>> otherPack = PackParams(otherParams...);
for (int i = 0; i < otherPack.size(); ++i) {
ret.push_back(otherPack[i]);
}
return ret;
}
vector<pair<size_t, void*>> PackParams(void) {
vector<pair<size_t, void*>> ret;
return ret;
}
pair<size_t, void*> AlignParams(vector<pair<size_t, void*>> params) {
int totalSize = 0;
for (int i = 0; i < params.size(); ++i) {
totalSize += params[i].first;
}
char* paramBlock = new char[totalSize];
totalSize = 0;
for (int i = 0; i < params.size(); ++i) {
memcpy(¶mBlock[totalSize], params[i].second, params[i].first);
totalSize += params[i].first;
}
return pair<size_t, void*>(totalSize, paramBlock);
}
int test1(int a, int b) {
cout << a + b << endl;
return a + b;
}
void Call(void* pFunc, void* params, int paramSize) {
_asm {
mov edx, paramSize
mov ebx, params
xor ecx,ecx
loop1:
push dword ptr [ebx + ecx]
add ecx, 4
cmp ecx,paramSize
jl loop1
call pFunc
add esp, paramSize
nop
}
}
int main() {
vector<pair<size_t, void*>> r = PackParams(5, 6);
pair<size_t,void*> paramData = AlignParams(r);
Call(test1, paramData.second, paramData.first);
//system("pause");
return 0;
}
将paramData和函数指针传递给线程,它们就可以使用Call函数了。
警告:
您将必须实施正确的 AlignParams 函数以确保实现 4 字节、8 字节或任何对齐,具体取决于系统架构的要求。
此答案符合 x86 stdcall 调用约定。其他调用约定需要不同的汇编,尤其是 x64,它在某些情况下有一些根本性的偏离。
此外,为了快速为您提供解决方案,我省略了一些基本的内存处理(没有删除动态分配的内存等)。这只是一个示范性解决方案,旨在向您大体展示您需要做什么才能获得更强大的解决方案。
当然,您也可以将 Call、PackParams 和 AlignParams 组合成一个可变参数模板函数,以简化语法。
这是一个非常简单有效的解决方案,可以实现您的 enq 和 deq(为简洁起见,进行了一些小改动)。
我留下了一个早期的内联汇编解决方案,因为你提到了寄存器和堆栈,但这个解决方案不需要任何汇编,并且适用于具有任何数量和类型参数的任何函数。
为了快速回顾,"stack and registers" 存储在 ThreadFunction 实例中。您只需使用 ThreadFunction::Call 来调用带有存储参数的存储函数。 main 函数非常简单,仅对一些线程使用 enq 和 deq,但您可以使用 GetThreadInvokable 将函数及其参数打包到 ThreadFunction 对象中,然后您可以在需要时将其入队。
#include <iostream>
#include <vector>
#include <memory>
#include <thread>
#include <functional>
#include <mutex>
using namespace std;
struct ThreadFunction {
virtual void Call() = 0;
};
template <typename F, typename ... Args>
struct ThreadFunctionPacked : public ThreadFunction {
std::function<void(void)> m_lambda;
ThreadFunctionPacked(F pFunc, Args ... args) {
m_lambda = [pFunc, args...]() {
pFunc(args...);
};
}
virtual void Call() {
m_lambda();
}
};
template <typename F, typename ... Args>
ThreadFunction* GetThreadInvokable(F pFunc, Args... args) {
ThreadFunction* ret = (ThreadFunction*) new ThreadFunctionPacked<F, Args...> (pFunc, args...);
return ret;
}
struct Job {
ThreadFunction* m_funcAndArgs = NULL;
Job(ThreadFunction* p) {
m_funcAndArgs = p;
}
void Run() {
m_funcAndArgs->Call();
}
};
std::mutex mutexJobs;
std::vector<Job*> jobs;
std::mutex mutexConsole;
template <typename F, typename ... Args>
void enq(F pFunc, Args... args) {
std::lock_guard<std::mutex> lock(mutexJobs);
jobs.push_back(new Job(GetThreadInvokable(pFunc, args...)));
}
void deq() {
Job* job = NULL;
{
std::lock_guard<std::mutex> lock(mutexJobs);
if (jobs.empty()) {
return;
}
job = jobs[0];
jobs.erase(jobs.begin());
}
if (job != NULL) {
job->Run();
}
}
void testAdd(int a, int b) {
std::lock_guard<std::mutex> lock(mutexConsole);
cout << a + b << endl;
}
void testMinus(int a, int b) {
std::lock_guard<std::mutex> lock(mutexConsole);
cout << a - b << endl;
}
void testVoid() {
std::lock_guard<std::mutex> lock(mutexConsole);
cout << "Void function" << endl;
}
void testPrint(std::string str) {
std::lock_guard<std::mutex> lock(mutexConsole);
cout << str << endl;
}
void thread1Func() {
deq();
deq();
deq();
}
void thread2Func() {
deq();
deq();
deq();
}
int main() {
enq(testAdd, 5, 3);
enq(testAdd, 10, 50);
enq(testMinus, 7, 20);
enq(testVoid);
enq(testPrint, "Hello");
std::thread t1(thread1Func);
std::thread t2(thread2Func);
t1.join();
t2.join();
return 0;
}
如您所见,我使用了几个互斥锁来进行概念验证线程处理,但是您应该按照计划在解决方案中使用线程池来使其更加健壮,并且还要注意我没有实现任何析构函数或删除您当然应该做的动态内存(将指针包装在 std::unique_ptr 中就足够了)。
我会保留我之前的解决方案,以防有人想涉足寄存器和堆栈管理,但我认为您会对这个解决方案感到最满意。