python multiprocessing .join() 死锁取决于工作函数
python multiprocessing .join() deadlock depends on worker function
我正在使用 multiprocessing
python library to spawn 4 Process()
objects to parallelize a cpu intensive task. The task (inspiration and code from this great article) 来计算列表中每个整数的质因数。
main.py:
import random
import multiprocessing
import sys
num_inputs = 4000
num_procs = 4
proc_inputs = num_inputs/num_procs
input_list = [int(1000*random.random()) for i in xrange(num_inputs)]
output_queue = multiprocessing.Queue()
procs = []
for p_i in xrange(num_procs):
print "Process [%d]"%p_i
proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
print " - num inputs: [%d]"%len(proc_list)
# Using target=worker1 HANGS on join
p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
# Using target=worker2 RETURNS with success
#p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))
procs.append(p)
p.start()
for p in jobs:
print "joining ", p, output_queue.qsize(), output_queue.full()
p.join()
print "joined ", p, output_queue.qsize(), output_queue.full()
print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)
观察:
- 如果每个进程的目标都是函数
worker1
,对于大于 4000 个元素的输入列表,主线程会卡在 .join()
,等待生成的进程终止并且永远不会 returns.
- 如果每个进程的目标是函数
worker2
,对于相同的输入列表,代码工作正常,主线程 returns.
这让我很困惑,因为 worker1
和 worker2
之间的唯一区别(见下文)是前者在 Queue
中插入单独的列表,而后者插入每个进程的单个列表列表。
为什么使用 worker1
而不使用 worker2
目标会出现死锁?
不应该两者(或两者都不)超出 Multiprocessing Queue maxsize limit is 32767?
工人 1 与工人 2:
def worker1(proc_num, proc_list, output_queue):
'''worker function which deadlocks'''
for num in proc_list:
output_queue.put(factorize_naive(num))
def worker2(proc_num, proc_list, output_queue):
'''worker function that works'''
workers_stuff = []
for num in proc_list:
workers_stuff.append(factorize_naive(num))
output_queue.put(workers_stuff)
SO 上有很多 个类似的问题,但我相信这个问题的核心明显不同于所有问题。
相关链接:
- https://sopython.com/canon/82/programs-using-multiprocessing-hang-deadlock-and-never-complete/
- python multiprocessing - process hangs on join for large queue
- Script using multiprocessing module does not terminate
- Why does multiprocessing.Process.join() hang?
- When to call .join() on a process?
- What exactly is Python multiprocessing Module's .join() Method Doing?
The docs 警告:
Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
虽然 Queue
看起来是无界的,但在幕后,排队的项目被缓冲在内存中以避免进程间管道过载。在刷新这些内存缓冲区之前,进程无法正常结束。您的 worker1()
在 队列中 放置的项目比 worker2()
多得多,仅此而已。请注意,在实施诉诸内存缓冲之前可以排队的项目数量没有定义:它可以在 OS 和 Python 版本之间变化。
正如文档所建议的,避免这种情况的正常方法是 .get()
所有项目 off 队列 before您尝试 .join()
进程。正如您所发现的,是否有必要 这样做取决于每个工作进程已将多少项目放入队列中的未定义方式。
我正在使用 multiprocessing
python library to spawn 4 Process()
objects to parallelize a cpu intensive task. The task (inspiration and code from this great article) 来计算列表中每个整数的质因数。
main.py:
import random
import multiprocessing
import sys
num_inputs = 4000
num_procs = 4
proc_inputs = num_inputs/num_procs
input_list = [int(1000*random.random()) for i in xrange(num_inputs)]
output_queue = multiprocessing.Queue()
procs = []
for p_i in xrange(num_procs):
print "Process [%d]"%p_i
proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
print " - num inputs: [%d]"%len(proc_list)
# Using target=worker1 HANGS on join
p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
# Using target=worker2 RETURNS with success
#p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))
procs.append(p)
p.start()
for p in jobs:
print "joining ", p, output_queue.qsize(), output_queue.full()
p.join()
print "joined ", p, output_queue.qsize(), output_queue.full()
print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)
观察:
- 如果每个进程的目标都是函数
worker1
,对于大于 4000 个元素的输入列表,主线程会卡在.join()
,等待生成的进程终止并且永远不会 returns. - 如果每个进程的目标是函数
worker2
,对于相同的输入列表,代码工作正常,主线程 returns.
这让我很困惑,因为 worker1
和 worker2
之间的唯一区别(见下文)是前者在 Queue
中插入单独的列表,而后者插入每个进程的单个列表列表。
为什么使用 worker1
而不使用 worker2
目标会出现死锁?
不应该两者(或两者都不)超出 Multiprocessing Queue maxsize limit is 32767?
工人 1 与工人 2:
def worker1(proc_num, proc_list, output_queue):
'''worker function which deadlocks'''
for num in proc_list:
output_queue.put(factorize_naive(num))
def worker2(proc_num, proc_list, output_queue):
'''worker function that works'''
workers_stuff = []
for num in proc_list:
workers_stuff.append(factorize_naive(num))
output_queue.put(workers_stuff)
SO 上有很多 个类似的问题,但我相信这个问题的核心明显不同于所有问题。
相关链接:
- https://sopython.com/canon/82/programs-using-multiprocessing-hang-deadlock-and-never-complete/
- python multiprocessing - process hangs on join for large queue
- Script using multiprocessing module does not terminate
- Why does multiprocessing.Process.join() hang?
- When to call .join() on a process?
- What exactly is Python multiprocessing Module's .join() Method Doing?
The docs 警告:
Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
虽然 Queue
看起来是无界的,但在幕后,排队的项目被缓冲在内存中以避免进程间管道过载。在刷新这些内存缓冲区之前,进程无法正常结束。您的 worker1()
在 队列中 放置的项目比 worker2()
多得多,仅此而已。请注意,在实施诉诸内存缓冲之前可以排队的项目数量没有定义:它可以在 OS 和 Python 版本之间变化。
正如文档所建议的,避免这种情况的正常方法是 .get()
所有项目 off 队列 before您尝试 .join()
进程。正如您所发现的,是否有必要 这样做取决于每个工作进程已将多少项目放入队列中的未定义方式。