Python 使用队列进行多处理(动态拆分负载)
Python multiprocessing with Queue (split loads dynamically)
我正在尝试使用多处理来处理大量文件。
我试图将文件列表放入队列中,并让 3 个工作人员使用一种常见的队列数据类型来分担负载。但是,这似乎不起作用。可能我误解了 multiprocessing 包中的队列。
下面是示例源代码:
import multiprocessing
from multiprocessing import Queue
def worker(i, qu):
"""worker function"""
while ~qu.empty():
val=qu.get()
print 'Worker:',i, ' start with file:',val
j=1
for k in range(i*10000,(i+1)*10000): # some time consuming process
for j in range(i*10000,(i+1)*10000):
j=j+k
print 'Worker:',i, ' end with file:',val
if __name__ == '__main__':
jobs = []
qu=Queue()
for j in range(100,110): # files numbers are from 100 to 110
qu.put(j)
for i in range(3): # 3 multiprocess
p = multiprocessing.Process(target=worker, args=(i,qu))
jobs.append(p)
p.start()
p.join()
感谢您的评论。
我开始知道使用 Pool 是最好的解决方案。
import multiprocessing
import time
def worker(val):
"""worker function"""
print 'Worker: start with file:',val
time.sleep(1.1)
print 'Worker: end with file:',val
if __name__ == '__main__':
file_list=range(100,110)
p = multiprocessing.Pool(2)
p.map(worker, file_list)
您只是加入了您创建的最后一个进程。这意味着如果第一个或第二个进程仍在运行而第三个进程已完成,则您的主进程正在关闭并在剩余进程完成之前将其杀死。
你应该加入他们,以便等到他们完成:
for p in jobs:
p.join()
另一件事是您应该考虑使用 qu.get_nowait()
以消除 qu.empty()
和 qu.get()
之间的竞争条件。
例如:
try:
while 1:
message = self.queue.get_nowait()
""" do something fancy here """
except Queue.Empty:
pass
希望对你有所帮助
两期:
1) 您仅在第 3 个过程中加入
2)为什么不用multiprocessing.Pool?
3) qu.get()
上的竞争条件
1 & 3)
import multiprocessing
from multiprocessing import Queue
def worker(i, qu):
"""worker function"""
while 1:
try:
val=qu.get(timeout)
except Queue.Empty: break# Yay no race condition
print 'Worker:',i, ' start with file:',val
j=1
for k in range(i*10000,(i+1)*10000): # some time consuming process
for j in range(i*10000,(i+1)*10000):
j=j+k
print 'Worker:',i, ' end with file:',val
if __name__ == '__main__':
jobs = []
qu=Queue()
for j in range(100,110): # files numbers are from 100 to 110
qu.put(j)
for i in range(3): # 3 multiprocess
p = multiprocessing.Process(target=worker, args=(i,qu))
jobs.append(p)
p.start()
for p in jobs: #<--- join on all processes ...
p.join()
2)
有关如何使用池,请参阅:
我正在尝试使用多处理来处理大量文件。 我试图将文件列表放入队列中,并让 3 个工作人员使用一种常见的队列数据类型来分担负载。但是,这似乎不起作用。可能我误解了 multiprocessing 包中的队列。 下面是示例源代码:
import multiprocessing
from multiprocessing import Queue
def worker(i, qu):
"""worker function"""
while ~qu.empty():
val=qu.get()
print 'Worker:',i, ' start with file:',val
j=1
for k in range(i*10000,(i+1)*10000): # some time consuming process
for j in range(i*10000,(i+1)*10000):
j=j+k
print 'Worker:',i, ' end with file:',val
if __name__ == '__main__':
jobs = []
qu=Queue()
for j in range(100,110): # files numbers are from 100 to 110
qu.put(j)
for i in range(3): # 3 multiprocess
p = multiprocessing.Process(target=worker, args=(i,qu))
jobs.append(p)
p.start()
p.join()
感谢您的评论。 我开始知道使用 Pool 是最好的解决方案。
import multiprocessing
import time
def worker(val):
"""worker function"""
print 'Worker: start with file:',val
time.sleep(1.1)
print 'Worker: end with file:',val
if __name__ == '__main__':
file_list=range(100,110)
p = multiprocessing.Pool(2)
p.map(worker, file_list)
您只是加入了您创建的最后一个进程。这意味着如果第一个或第二个进程仍在运行而第三个进程已完成,则您的主进程正在关闭并在剩余进程完成之前将其杀死。
你应该加入他们,以便等到他们完成:
for p in jobs:
p.join()
另一件事是您应该考虑使用 qu.get_nowait()
以消除 qu.empty()
和 qu.get()
之间的竞争条件。
例如:
try:
while 1:
message = self.queue.get_nowait()
""" do something fancy here """
except Queue.Empty:
pass
希望对你有所帮助
两期:
1) 您仅在第 3 个过程中加入
2)为什么不用multiprocessing.Pool?
3) qu.get()
上的竞争条件1 & 3)
import multiprocessing
from multiprocessing import Queue
def worker(i, qu):
"""worker function"""
while 1:
try:
val=qu.get(timeout)
except Queue.Empty: break# Yay no race condition
print 'Worker:',i, ' start with file:',val
j=1
for k in range(i*10000,(i+1)*10000): # some time consuming process
for j in range(i*10000,(i+1)*10000):
j=j+k
print 'Worker:',i, ' end with file:',val
if __name__ == '__main__':
jobs = []
qu=Queue()
for j in range(100,110): # files numbers are from 100 to 110
qu.put(j)
for i in range(3): # 3 multiprocess
p = multiprocessing.Process(target=worker, args=(i,qu))
jobs.append(p)
p.start()
for p in jobs: #<--- join on all processes ...
p.join()
2)
有关如何使用池,请参阅: