Multiprocess.Process 未退出空队列
Multiprocess.Process not exiting on empty Queue
我有大量对象要迭代,我认为多处理会大大加快工作速度。但是,一旦我增加核心数,这个简单的示例似乎就会挂起。
它挂在 p.join()
行,如果我终止并检查,q_in.empty()
returns True
并且输出队列有适当数量的项目。
是什么导致它挂起?
from multiprocessing import Process, Queue
import time
def worker_func(q_in, q_out, w):
time.sleep(1)
while not q_in.empty():
# Simple code standing in for more complex operation
q_out.put(str(w) + '_' + str(q_in.get()))
def setup_func(x):
q_in = Queue()
for w in range(x):
q_in.put(w)
q_out = Queue()
return((q_in, q_out))
def test_func(num_cores, q_in, q_out):
processes = []
for w in range(num_cores):
p = Process(target=worker_func, args=(q_in, q_out, w))
processes.append(p)
p.start()
for p in processes:
p.join()
output_ls = []
while not q_out.empty():
output_ls.append(q_out.get())
return(output_ls)
q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2
q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5
您有多个进程从队列中拉取。队列可能有数据,但当您开始获取数据时,另一个进程已经使用了它。 multiprocessing.Queue.empty 说 由于 multithreading/multiprocessing 语义,这不可靠。
另一种方法是在队列的末尾放置一个进程结束哨兵,每个进程一个。当进程看到哨兵时,它就会退出。对于您的情况,None
是一个不错的选择。
from multiprocessing import Process, Queue
import time
def worker_func(q_in, q_out, w):
time.sleep(1)
while True:
msg = q_in.get()
if msg is None:
break
q_out.put(str(w) + '_' + str(msg))
def setup_func(x):
q_in = Queue()
for w in range(x):
q_in.put(w)
q_out = Queue()
return((q_in, q_out))
def test_func(num_cores, q_in, q_out):
processes = []
for w in range(num_cores):
q_in.put(None)
p = Process(target=worker_func, args=(q_in, q_out, w))
processes.append(p)
p.start()
for p in processes:
p.join()
output_ls = []
while not q_out.empty():
output_ls.append(q_out.get())
return(output_ls)
q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2
q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5
我有大量对象要迭代,我认为多处理会大大加快工作速度。但是,一旦我增加核心数,这个简单的示例似乎就会挂起。
它挂在 p.join()
行,如果我终止并检查,q_in.empty()
returns True
并且输出队列有适当数量的项目。
是什么导致它挂起?
from multiprocessing import Process, Queue
import time
def worker_func(q_in, q_out, w):
time.sleep(1)
while not q_in.empty():
# Simple code standing in for more complex operation
q_out.put(str(w) + '_' + str(q_in.get()))
def setup_func(x):
q_in = Queue()
for w in range(x):
q_in.put(w)
q_out = Queue()
return((q_in, q_out))
def test_func(num_cores, q_in, q_out):
processes = []
for w in range(num_cores):
p = Process(target=worker_func, args=(q_in, q_out, w))
processes.append(p)
p.start()
for p in processes:
p.join()
output_ls = []
while not q_out.empty():
output_ls.append(q_out.get())
return(output_ls)
q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2
q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5
您有多个进程从队列中拉取。队列可能有数据,但当您开始获取数据时,另一个进程已经使用了它。 multiprocessing.Queue.empty 说 由于 multithreading/multiprocessing 语义,这不可靠。
另一种方法是在队列的末尾放置一个进程结束哨兵,每个进程一个。当进程看到哨兵时,它就会退出。对于您的情况,None
是一个不错的选择。
from multiprocessing import Process, Queue
import time
def worker_func(q_in, q_out, w):
time.sleep(1)
while True:
msg = q_in.get()
if msg is None:
break
q_out.put(str(w) + '_' + str(msg))
def setup_func(x):
q_in = Queue()
for w in range(x):
q_in.put(w)
q_out = Queue()
return((q_in, q_out))
def test_func(num_cores, q_in, q_out):
processes = []
for w in range(num_cores):
q_in.put(None)
p = Process(target=worker_func, args=(q_in, q_out, w))
processes.append(p)
p.start()
for p in processes:
p.join()
output_ls = []
while not q_out.empty():
output_ls.append(q_out.get())
return(output_ls)
q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2
q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5