优先队列与 pthreads 同步
Priority Queue synchronization with pthreads
我正在完成一项大学作业,我们要为 15 puzzle 实施并行 A* 搜索。对于这一部分,我们将只使用一个优先级队列(我想看到多个线程的争用会限制加速)。我面临的一个问题是正确同步从优先级队列中弹出下一个 "candidate"。
我尝试了以下方法:
while(1) {
// The board I'm trying to pop.
Board current_board;
pthread_mutex_lock(&priority_queue_lock);
// If the heap is empty, wait till another thread adds new candidates.
if (pq->heap_size == 0)
{
printf("Waiting...\n");
pthread_mutex_unlock(&priority_queue_lock);
continue;
}
current_board = top(pq);
pthread_mutex_unlock(&priority_queue_lock);
// Generate the new boards from the current one and add to the heap...
}
我尝试过同一想法的不同变体,但由于某些原因,线程有时会卡在 "Waiting" 上。该代码串行(或使用两个线程)运行良好,所以这让我相信这是代码中有问题的部分。如有必要,我可以 post 全部。我觉得这是我对互斥锁的理解的问题。在此先感谢您的帮助。
编辑:
我在下面添加了并行线程的完整代码:
// h and p are global pointers initialized in main()
void* parallelThread(void* arg)
{
int thread_id = (int)(long long)(arg);
while(1)
{
Board current_board;
pthread_mutex_lock(&priority_queue_lock);
current_board = top(p);
pthread_mutex_unlock(&priority_queue_lock);
// Move blank up.
if (current_board.blank_x > 0)
{
int newpos = current_board.blank_x - 1;
Board new_board = current_board;
new_board.board[current_board.blank_x][current_board.blank_y] = new_board.board[newpos][current_board.blank_y];
new_board.board[newpos][current_board.blank_y] = BLANK;
new_board.blank_x = newpos;
new_board.goodness = get_goodness(new_board.board);
new_board.turncount++;
if (check_solved(new_board))
{
printf("Solved in %d turns",new_board.turncount);
exit(0);
}
if (!exists(h,new_board))
{
insert(h,new_board);
push(p,new_board);
}
}
// Move blank down.
if (current_board.blank_x < 3)
{
int newpos = current_board.blank_x + 1;
Board new_board = current_board;
new_board.board[current_board.blank_x][current_board.blank_y] = new_board.board[newpos][current_board.blank_y];
new_board.board[newpos][current_board.blank_y] = BLANK;
new_board.blank_x = newpos;
new_board.goodness = get_goodness(new_board.board);
new_board.turncount++;
if (check_solved(new_board))
{
printf("Solved in %d turns",new_board.turncount);
exit(0);
}
if (!exists(h,new_board))
{
insert(h,new_board);
push(p,new_board);
}
}
// Move blank right.
if (current_board.blank_y < 3)
{
int newpos = current_board.blank_y + 1;
Board new_board = current_board;
new_board.board[current_board.blank_x][current_board.blank_y] = new_board.board[current_board.blank_x][newpos];
new_board.board[current_board.blank_x][newpos] = BLANK;
new_board.blank_y = newpos;
new_board.goodness = get_goodness(new_board.board);
new_board.turncount++;
if (check_solved(new_board))
{
printf("Solved in %d turns",new_board.turncount);
exit(0);
}
if (!exists(h,new_board))
{
insert(h,new_board);
push(p,new_board);
}
}
// Move blank left.
if (current_board.blank_y > 0)
{
int newpos = current_board.blank_y - 1;
Board new_board = current_board;
new_board.board[current_board.blank_x][current_board.blank_y] = new_board.board[current_board.blank_x][newpos];
new_board.board[current_board.blank_x][newpos] = BLANK;
new_board.blank_y = newpos;
new_board.goodness = get_goodness(new_board.board);
new_board.turncount++;
if (check_solved(new_board))
{
printf("Solved in %d turns",new_board.turncount);
exit(0);
}
if (!exists(h,new_board))
{
insert(h,new_board);
push(p,new_board);
}
}
}
return NULL;
}
I tried the following:
假设 top
也从队列中删除了板,我没有发现后面的代码有任何问题。这很浪费(如果队列为空,它会自旋锁定和解锁互斥量),但不会错。
I've added the full code
如果没有 exists
、insert
和 push
的代码,这将毫无用处。
一般观察:
pthread_mutex_lock(&priority_queue_lock);
current_board = top(p);
pthread_mutex_unlock(&priority_queue_lock);
在上面的代码中,您的锁定是 top
函数的 "ouside"。但是在这里:
if (!exists(h,new_board))
{
insert(h,new_board);
push(p,new_board);
}
你要么根本不加锁(在这种情况下这是一个错误),要么加锁 "inside" exists
、insert
和 push
.
您不应混合使用 "inside" 和 "outside" 锁定。选择一个或另一个并坚持下去。
如果你实际上没有在 exists
、insert
等内部锁定队列,那么你就会发生数据竞争并且错误地考虑了互斥量:它们保护 不变量,并且您无法检查队列是否为空 与另一个线程并行 执行 "remove top element" -- 这些操作需要序列化,因此必须都是在锁下完成。
我正在完成一项大学作业,我们要为 15 puzzle 实施并行 A* 搜索。对于这一部分,我们将只使用一个优先级队列(我想看到多个线程的争用会限制加速)。我面临的一个问题是正确同步从优先级队列中弹出下一个 "candidate"。
我尝试了以下方法:
while(1) {
// The board I'm trying to pop.
Board current_board;
pthread_mutex_lock(&priority_queue_lock);
// If the heap is empty, wait till another thread adds new candidates.
if (pq->heap_size == 0)
{
printf("Waiting...\n");
pthread_mutex_unlock(&priority_queue_lock);
continue;
}
current_board = top(pq);
pthread_mutex_unlock(&priority_queue_lock);
// Generate the new boards from the current one and add to the heap...
}
我尝试过同一想法的不同变体,但由于某些原因,线程有时会卡在 "Waiting" 上。该代码串行(或使用两个线程)运行良好,所以这让我相信这是代码中有问题的部分。如有必要,我可以 post 全部。我觉得这是我对互斥锁的理解的问题。在此先感谢您的帮助。
编辑: 我在下面添加了并行线程的完整代码:
// h and p are global pointers initialized in main()
void* parallelThread(void* arg)
{
int thread_id = (int)(long long)(arg);
while(1)
{
Board current_board;
pthread_mutex_lock(&priority_queue_lock);
current_board = top(p);
pthread_mutex_unlock(&priority_queue_lock);
// Move blank up.
if (current_board.blank_x > 0)
{
int newpos = current_board.blank_x - 1;
Board new_board = current_board;
new_board.board[current_board.blank_x][current_board.blank_y] = new_board.board[newpos][current_board.blank_y];
new_board.board[newpos][current_board.blank_y] = BLANK;
new_board.blank_x = newpos;
new_board.goodness = get_goodness(new_board.board);
new_board.turncount++;
if (check_solved(new_board))
{
printf("Solved in %d turns",new_board.turncount);
exit(0);
}
if (!exists(h,new_board))
{
insert(h,new_board);
push(p,new_board);
}
}
// Move blank down.
if (current_board.blank_x < 3)
{
int newpos = current_board.blank_x + 1;
Board new_board = current_board;
new_board.board[current_board.blank_x][current_board.blank_y] = new_board.board[newpos][current_board.blank_y];
new_board.board[newpos][current_board.blank_y] = BLANK;
new_board.blank_x = newpos;
new_board.goodness = get_goodness(new_board.board);
new_board.turncount++;
if (check_solved(new_board))
{
printf("Solved in %d turns",new_board.turncount);
exit(0);
}
if (!exists(h,new_board))
{
insert(h,new_board);
push(p,new_board);
}
}
// Move blank right.
if (current_board.blank_y < 3)
{
int newpos = current_board.blank_y + 1;
Board new_board = current_board;
new_board.board[current_board.blank_x][current_board.blank_y] = new_board.board[current_board.blank_x][newpos];
new_board.board[current_board.blank_x][newpos] = BLANK;
new_board.blank_y = newpos;
new_board.goodness = get_goodness(new_board.board);
new_board.turncount++;
if (check_solved(new_board))
{
printf("Solved in %d turns",new_board.turncount);
exit(0);
}
if (!exists(h,new_board))
{
insert(h,new_board);
push(p,new_board);
}
}
// Move blank left.
if (current_board.blank_y > 0)
{
int newpos = current_board.blank_y - 1;
Board new_board = current_board;
new_board.board[current_board.blank_x][current_board.blank_y] = new_board.board[current_board.blank_x][newpos];
new_board.board[current_board.blank_x][newpos] = BLANK;
new_board.blank_y = newpos;
new_board.goodness = get_goodness(new_board.board);
new_board.turncount++;
if (check_solved(new_board))
{
printf("Solved in %d turns",new_board.turncount);
exit(0);
}
if (!exists(h,new_board))
{
insert(h,new_board);
push(p,new_board);
}
}
}
return NULL;
}
I tried the following:
假设 top
也从队列中删除了板,我没有发现后面的代码有任何问题。这很浪费(如果队列为空,它会自旋锁定和解锁互斥量),但不会错。
I've added the full code
如果没有 exists
、insert
和 push
的代码,这将毫无用处。
一般观察:
pthread_mutex_lock(&priority_queue_lock);
current_board = top(p);
pthread_mutex_unlock(&priority_queue_lock);
在上面的代码中,您的锁定是 top
函数的 "ouside"。但是在这里:
if (!exists(h,new_board))
{
insert(h,new_board);
push(p,new_board);
}
你要么根本不加锁(在这种情况下这是一个错误),要么加锁 "inside" exists
、insert
和 push
.
您不应混合使用 "inside" 和 "outside" 锁定。选择一个或另一个并坚持下去。
如果你实际上没有在 exists
、insert
等内部锁定队列,那么你就会发生数据竞争并且错误地考虑了互斥量:它们保护 不变量,并且您无法检查队列是否为空 与另一个线程并行 执行 "remove top element" -- 这些操作需要序列化,因此必须都是在锁下完成。