pthread C 中的线程队列 - Web 服务器响应流水线
Queue of Threads in pthread C - web server repsonse pipelining
我有一个用 C 实现的类似 HTTP Apache 的 Web 服务器,我的问题是我不知道如何初始化队列(因此不知道如何将线程排入队列),主要是因为我不确定如何在继续当前线程之前检查是否有前一个线程要加入。
服务器可以利用管道请求来提高其响应速度,使用线程
更复杂的方法:Web 服务器可以为每个新请求生成一个新线程
资源,同时准备响应;但是,由于必须返回资源
以服务器接收请求的相同顺序(FIFO)发送给客户端,它将
在各种响应线程之间采取协调阶段。
这个协调阶段是通过实施一种 "waiting room for the doctor"
其中每个病人在进入时询问谁是最后一个到达的,跟踪它并
只有当他面前的人离开时,他才能进入医生办公室。这样,每个人都有
队列的部分视图(只关心一个人)但是这个部分视图允许正确的
FIFO 队列的实现。
以下是我必须执行的操作的说明:
同样,每个新线程都必须存储处理前一个线程的标识符
使用系统调用 pthread_join () 请求并等待其终止。第一个线程,
显然,不必等待任何人,最后一个线程必须由主线程等待
在关闭连接本身之前处理该连接上的请求的线程和
返回等待新的连接请求。
我在正确初始化to_join数据结构时遇到问题,主要是因为我不明白如何计算要加入的线程的索引 i。-如何区分指针数组中的第一个和最后一个线程?
这是代码(我只能在 TO BE DONE START 和 TO BE DONE END 注释之间修改):
#include "incApache.h"
pthread_mutex_t accept_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mime_mutex = PTHREAD_MUTEX_INITIALIZER;
int client_sockets[MAX_CONNECTIONS]; /* for each connection, its socket FD */
int no_response_threads[MAX_CONNECTIONS]; /* for each connection, how many response threads */
pthread_t thread_ids[MAX_THREADS];
int connection_no[MAX_THREADS]; /* connection_no[i] >= 0 means that i-th thread belongs to connection connection_no[i] */
pthread_t *to_join[MAX_THREADS]; /* for each thread, the pointer to the previous (response) thread, if any */
int no_free_threads = MAX_THREADS - 2 * MAX_CONNECTIONS; /* each connection has one thread listening and one reserved for replies */
struct response_params thread_params[MAX_THREADS - MAX_CONNECTIONS]; /* params for the response threads (the first MAX_CONNECTIONS threads are waiting/parsing requests) */
pthread_mutex_t threads_mutex = PTHREAD_MUTEX_INITIALIZER; /* protects the access to thread-related data structures */
pthread_t thread_ids[MAX_CONNECTIONS];
int connection_no[MAX_CONNECTIONS];
void *client_connection_thread(void *vp) {
int client_fd;
struct sockaddr_storage client_addr;
socklen_t addr_size;
pthread_mutex_lock(&threads_mutex);
int connection_no = *((int *) vp);
/*** properly initialize the thread queue to_join ***/
/*** TO BE DONE 3.1 START ***/
//to_join[0] = thread_ids[new_thread_idx];
//pthread_t *first; Am I perhaps supposed to initialize the to_join data structure as a queue with two pointers
//pthread_t *last; indicating the first and last element? How can I do it on an array of pointers?
/*** TO BE DONE 3.1 END ***/
pthread_mutex_unlock(&threads_mutex);
#endif
for (;;) {
addr_size = sizeof(client_addr);
pthread_mutex_lock(&accept_mutex);
if ((client_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &addr_size)) == -1)
fail_errno("Cannot accept client connection");
pthread_mutex_unlock(&accept_mutex);
client_sockets[connection_no] = client_fd;
char str[INET_ADDRSTRLEN];
struct sockaddr_in *ipv4 = (struct sockaddr_in *) &client_addr;
printf("Accepted connection from %s\n", inet_ntop(AF_INET, &(ipv4->sin_addr), str, INET_ADDRSTRLEN));
manage_http_requests(client_fd
, connection_no);
}
}
#pragma clang diagnostic pop
void send_resp_thread(int out_socket, int response_code, int cookie,
int is_http1_0, int connection_idx, int new_thread_idx,
char *filename, struct stat *stat_p)
{
struct response_params *params = thread_params + (new_thread_idx - MAX_CONNECTIONS);
debug(" ... send_resp_thread(): idx=%lu\n", (unsigned long)(params - thread_params));
params->code = response_code;
params->cookie = cookie;
params->is_http1_0 = is_http1_0;
params->filename = filename ? my_strdup(filename) : NULL;
params->p_stat = stat_p;
pthread_mutex_lock(&threads_mutex);
connection_no[new_thread_idx] = connection_idx;
debug(" ... send_resp_thread(): parameters set, conn_no=%d\n", connection_idx);
/*** enqueue the current thread in the "to_join" data structure ***/
/*** TO BE DONE 3.1 START ***/
//Again, should I use a standard enqueue implementation? But then how would I keep track of the last node ot arrive?
/*** TO BE DONE 3.1 END ***/
if (pthread_create(thread_ids + new_thread_idx, NULL, response_thread, connection_no + new_thread_idx))
fail_errno("Could not create response thread");
pthread_mutex_unlock(&threads_mutex);
debug(" ... send_resp_thread(): new thread created\n");
}
void *response_thread(void *vp)
{
size_t thread_no = ((int *) vp) - connection_no;
int connection_idx = *((int *) vp);
debug(" ... response_thread() thread_no=%lu, conn_no=%d\n", (unsigned long) thread_no, connection_idx);
const size_t i = thread_no - MAX_CONNECTIONS;
send_response(client_sockets[connection_idx],
thread_params[i].code,
thread_params[i].cookie,
thread_params[i].is_http1_0,
(int)thread_no,
thread_params[i].filename,
thread_params[i].p_stat);
debug(" ... response_thread() freeing filename and stat\n");
free(thread_params[i].filename);
free(thread_params[i].p_stat);
return NULL;
}
I am having trouble initializing properly the to_join data structure,
mostly because I don't understand how to compute the index i of the
thread to join.- how can I differenciate the first and last thread in
an array of pointers?
赋值不同于初始化,对一个元素的操作不同于对整个数组的操作。据我所知,您实际上并没有在该函数中初始化 to_join
(因此评论具有误导性)。相反,您只需为单个元素分配适当的值。
该分析来自我对各种全局变量的名称、范围和文档注释的解释,以及相关函数的名称、签名和初始行:
- 似乎各种数组保存与多个连接的多个线程有关的数据,因为文件范围
connection_no
数组之一的作用是将线程与连接相关联。
- 看来该函数是连接相关线程的线程启动函数。
- 在任何其他与连接关联的线程正在运行时,任何线程都不应启动 运行 除了设置与自身相关的数据外,其他任何线程都不应启动,以免破坏其他线程和连接所依赖的数据。
现在,关于实际问题 -- 您如何确定新人应该加入哪个线程? 你不能。至少,不只是依赖问题中提供的模板代码,未修改。*
假设,如果您可以访问将线程与连接相关联的 connection_no
数组版本,那么您可以使用它来查找与相关联的所有线程的索引当前连接。然后,您可以从相应的 thread_ids
数组中获取它们的线程 ID(请注意,此处存在另一个名称冲突),并从 join_to
数组中获取它们的连接目标。连接的第一个线程是未连接到另一个的线程,最后一个是未被任何其他连接的线程。这种分析并不完全直截了当,但也没有真正的技巧。细节留作练习。
但是即使解决了文件范围名称冲突,您也无法执行上述分析,因为文件范围 connection_no
数组被整个区域中的同名局部变量覆盖您可以插入代码。*
另请注意,您似乎需要为新线程选择一个线程索引,该索引通常不会为 0。看起来您需要扫描 thread_ids
或 connection_no
数组查找可用索引。
*除非你作弊。我的意图是让您(仅)将代码插入 client_connection_thread
函数的主体,但实际上,您可以通过将代码插入指定区域来将该函数拆分为两个或多个。如果假设 connection_no
和 thread_ids
的第二个文件范围声明在实践中被忽略或丢失,那么拆分函数可以为阴影问题提供一个解决方法。例如:
/*** properly initialize the thread queue to_join ***/
/*** TO BE DONE 3.1 START ***/
return client_connection_thread_helper1(connection_no);
} // end of function
// parameter 'con' is the number of this thread's connection
void *client_connection_thread_helper1(int con) {
int my_index;
// ... Find an available thread index (TODO: what if there isn't one?) ...
thread_ids[my_index] = pthread_self();
connection_no[my_index] = con; // connection_no is not shadowed in this scope
pthread_t *last = NULL;
// ... Find the last (other) thread associated with connection 'con', if any ...
// You can determine the first, too, but that does not appear to be required.
to_join[my_index] = last;
return client_connection_thread_helper2(con);
}
// A second additional function is required for the remaining bits of
// client_connection_thread(), because they need the local connection_no
void *client_connection_thread_helper2(int connection_no) {
int client_fd;
struct sockaddr_storage client_addr;
socklen_t addr_size;
/*** TO BE DONE 3.1 END ***/
pthread_mutex_unlock(&threads_mutex);
我想弄清楚这种功能拆分的需求和实现可能是练习的一部分,但这将是一个肮脏的把戏,总的来说,练习似乎更有可能只是格式不正确。
我有一个用 C 实现的类似 HTTP Apache 的 Web 服务器,我的问题是我不知道如何初始化队列(因此不知道如何将线程排入队列),主要是因为我不确定如何在继续当前线程之前检查是否有前一个线程要加入。
服务器可以利用管道请求来提高其响应速度,使用线程 更复杂的方法:Web 服务器可以为每个新请求生成一个新线程 资源,同时准备响应;但是,由于必须返回资源 以服务器接收请求的相同顺序(FIFO)发送给客户端,它将 在各种响应线程之间采取协调阶段。
这个协调阶段是通过实施一种 "waiting room for the doctor" 其中每个病人在进入时询问谁是最后一个到达的,跟踪它并 只有当他面前的人离开时,他才能进入医生办公室。这样,每个人都有 队列的部分视图(只关心一个人)但是这个部分视图允许正确的 FIFO 队列的实现。
以下是我必须执行的操作的说明:
同样,每个新线程都必须存储处理前一个线程的标识符 使用系统调用 pthread_join () 请求并等待其终止。第一个线程, 显然,不必等待任何人,最后一个线程必须由主线程等待 在关闭连接本身之前处理该连接上的请求的线程和 返回等待新的连接请求。
我在正确初始化to_join数据结构时遇到问题,主要是因为我不明白如何计算要加入的线程的索引 i。-如何区分指针数组中的第一个和最后一个线程?
这是代码(我只能在 TO BE DONE START 和 TO BE DONE END 注释之间修改):
#include "incApache.h"
pthread_mutex_t accept_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mime_mutex = PTHREAD_MUTEX_INITIALIZER;
int client_sockets[MAX_CONNECTIONS]; /* for each connection, its socket FD */
int no_response_threads[MAX_CONNECTIONS]; /* for each connection, how many response threads */
pthread_t thread_ids[MAX_THREADS];
int connection_no[MAX_THREADS]; /* connection_no[i] >= 0 means that i-th thread belongs to connection connection_no[i] */
pthread_t *to_join[MAX_THREADS]; /* for each thread, the pointer to the previous (response) thread, if any */
int no_free_threads = MAX_THREADS - 2 * MAX_CONNECTIONS; /* each connection has one thread listening and one reserved for replies */
struct response_params thread_params[MAX_THREADS - MAX_CONNECTIONS]; /* params for the response threads (the first MAX_CONNECTIONS threads are waiting/parsing requests) */
pthread_mutex_t threads_mutex = PTHREAD_MUTEX_INITIALIZER; /* protects the access to thread-related data structures */
pthread_t thread_ids[MAX_CONNECTIONS];
int connection_no[MAX_CONNECTIONS];
void *client_connection_thread(void *vp) {
int client_fd;
struct sockaddr_storage client_addr;
socklen_t addr_size;
pthread_mutex_lock(&threads_mutex);
int connection_no = *((int *) vp);
/*** properly initialize the thread queue to_join ***/
/*** TO BE DONE 3.1 START ***/
//to_join[0] = thread_ids[new_thread_idx];
//pthread_t *first; Am I perhaps supposed to initialize the to_join data structure as a queue with two pointers
//pthread_t *last; indicating the first and last element? How can I do it on an array of pointers?
/*** TO BE DONE 3.1 END ***/
pthread_mutex_unlock(&threads_mutex);
#endif
for (;;) {
addr_size = sizeof(client_addr);
pthread_mutex_lock(&accept_mutex);
if ((client_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &addr_size)) == -1)
fail_errno("Cannot accept client connection");
pthread_mutex_unlock(&accept_mutex);
client_sockets[connection_no] = client_fd;
char str[INET_ADDRSTRLEN];
struct sockaddr_in *ipv4 = (struct sockaddr_in *) &client_addr;
printf("Accepted connection from %s\n", inet_ntop(AF_INET, &(ipv4->sin_addr), str, INET_ADDRSTRLEN));
manage_http_requests(client_fd
, connection_no);
}
}
#pragma clang diagnostic pop
void send_resp_thread(int out_socket, int response_code, int cookie,
int is_http1_0, int connection_idx, int new_thread_idx,
char *filename, struct stat *stat_p)
{
struct response_params *params = thread_params + (new_thread_idx - MAX_CONNECTIONS);
debug(" ... send_resp_thread(): idx=%lu\n", (unsigned long)(params - thread_params));
params->code = response_code;
params->cookie = cookie;
params->is_http1_0 = is_http1_0;
params->filename = filename ? my_strdup(filename) : NULL;
params->p_stat = stat_p;
pthread_mutex_lock(&threads_mutex);
connection_no[new_thread_idx] = connection_idx;
debug(" ... send_resp_thread(): parameters set, conn_no=%d\n", connection_idx);
/*** enqueue the current thread in the "to_join" data structure ***/
/*** TO BE DONE 3.1 START ***/
//Again, should I use a standard enqueue implementation? But then how would I keep track of the last node ot arrive?
/*** TO BE DONE 3.1 END ***/
if (pthread_create(thread_ids + new_thread_idx, NULL, response_thread, connection_no + new_thread_idx))
fail_errno("Could not create response thread");
pthread_mutex_unlock(&threads_mutex);
debug(" ... send_resp_thread(): new thread created\n");
}
void *response_thread(void *vp)
{
size_t thread_no = ((int *) vp) - connection_no;
int connection_idx = *((int *) vp);
debug(" ... response_thread() thread_no=%lu, conn_no=%d\n", (unsigned long) thread_no, connection_idx);
const size_t i = thread_no - MAX_CONNECTIONS;
send_response(client_sockets[connection_idx],
thread_params[i].code,
thread_params[i].cookie,
thread_params[i].is_http1_0,
(int)thread_no,
thread_params[i].filename,
thread_params[i].p_stat);
debug(" ... response_thread() freeing filename and stat\n");
free(thread_params[i].filename);
free(thread_params[i].p_stat);
return NULL;
}
I am having trouble initializing properly the to_join data structure, mostly because I don't understand how to compute the index i of the thread to join.- how can I differenciate the first and last thread in an array of pointers?
赋值不同于初始化,对一个元素的操作不同于对整个数组的操作。据我所知,您实际上并没有在该函数中初始化 to_join
(因此评论具有误导性)。相反,您只需为单个元素分配适当的值。
该分析来自我对各种全局变量的名称、范围和文档注释的解释,以及相关函数的名称、签名和初始行:
- 似乎各种数组保存与多个连接的多个线程有关的数据,因为文件范围
connection_no
数组之一的作用是将线程与连接相关联。 - 看来该函数是连接相关线程的线程启动函数。
- 在任何其他与连接关联的线程正在运行时,任何线程都不应启动 运行 除了设置与自身相关的数据外,其他任何线程都不应启动,以免破坏其他线程和连接所依赖的数据。
现在,关于实际问题 -- 您如何确定新人应该加入哪个线程? 你不能。至少,不只是依赖问题中提供的模板代码,未修改。*
假设,如果您可以访问将线程与连接相关联的 connection_no
数组版本,那么您可以使用它来查找与相关联的所有线程的索引当前连接。然后,您可以从相应的 thread_ids
数组中获取它们的线程 ID(请注意,此处存在另一个名称冲突),并从 join_to
数组中获取它们的连接目标。连接的第一个线程是未连接到另一个的线程,最后一个是未被任何其他连接的线程。这种分析并不完全直截了当,但也没有真正的技巧。细节留作练习。
但是即使解决了文件范围名称冲突,您也无法执行上述分析,因为文件范围 connection_no
数组被整个区域中的同名局部变量覆盖您可以插入代码。*
另请注意,您似乎需要为新线程选择一个线程索引,该索引通常不会为 0。看起来您需要扫描 thread_ids
或 connection_no
数组查找可用索引。
*除非你作弊。我的意图是让您(仅)将代码插入 client_connection_thread
函数的主体,但实际上,您可以通过将代码插入指定区域来将该函数拆分为两个或多个。如果假设 connection_no
和 thread_ids
的第二个文件范围声明在实践中被忽略或丢失,那么拆分函数可以为阴影问题提供一个解决方法。例如:
/*** properly initialize the thread queue to_join ***/
/*** TO BE DONE 3.1 START ***/
return client_connection_thread_helper1(connection_no);
} // end of function
// parameter 'con' is the number of this thread's connection
void *client_connection_thread_helper1(int con) {
int my_index;
// ... Find an available thread index (TODO: what if there isn't one?) ...
thread_ids[my_index] = pthread_self();
connection_no[my_index] = con; // connection_no is not shadowed in this scope
pthread_t *last = NULL;
// ... Find the last (other) thread associated with connection 'con', if any ...
// You can determine the first, too, but that does not appear to be required.
to_join[my_index] = last;
return client_connection_thread_helper2(con);
}
// A second additional function is required for the remaining bits of
// client_connection_thread(), because they need the local connection_no
void *client_connection_thread_helper2(int connection_no) {
int client_fd;
struct sockaddr_storage client_addr;
socklen_t addr_size;
/*** TO BE DONE 3.1 END ***/
pthread_mutex_unlock(&threads_mutex);
我想弄清楚这种功能拆分的需求和实现可能是练习的一部分,但这将是一个肮脏的把戏,总的来说,练习似乎更有可能只是格式不正确。