使用队列问题多处理
Issues Multiprocessing with Queues
情况:我有一个用 Python 编写的文件处理器。这些文件将 "walked" 并放入队列中。然后将使用 multirocessing
进行处理
问题:参考下面的代码
fileA.py
==========
import Queue
import os
def walker():
filelist = Queue.Queue()
queue_end = Object()
for root, dirs, files in os.walk('/'):
for f in files:
path = os.path.join(root,f)
if not os.path.islink(path):
filelist.put(path)
filelist.put(queue_end)
fileB.py
===========
import fileA
import os
import multiprocessing as mp
def processor(queuelock):
while True:
with queuelock:
filepath = fileA.filelist.get()
if filepath is fileA.queue_end:
filelist.put(queue_end)
break
#example of a job
os.move(filepath, "/home/newuser" + filepath)
print filepath + " has been moved!"
if __name__ == '__main__':
fileA.walker()
queuelock = mp.Lock()
jobs = []
for i in range(0,mp.cpu_count()):
process = mp.Process(target=processor(queuelock))
jobs.append(process)
process.start()
问题是在移动文件时,所有进程都会尝试移动完全相同的文件,即使它应该已从队列中删除。
示例输出:
randomFile as been moved!
Error: ......... randomFile not found
Error: ......... randomFile not found
Error: ......... randomFile not found
这意味着每个生成的进程都将完全相同的文件从队列中取出,并试图对同一文件执行相同的进程。
问题:我是否做错了什么,出于某种原因,filelist
队列已被发送到每个进程(现在正在发生的事情),而不是 filelist
队列由所有进程共享(我的预期结果)?
filelist
目前只是 walker()
的局部变量,并且队列对象不与代码的其他部分共享,因此至少 [=12= walker()
.
中需要 ]
要在多个进程之间共享同一个队列,需要multiprocessing.Queue
。 queue.Queue
在一个进程被分叉(或生成)时被复制,因此它成为每个进程的一个新的独立队列。
情况:我有一个用 Python 编写的文件处理器。这些文件将 "walked" 并放入队列中。然后将使用 multirocessing
问题:参考下面的代码
fileA.py
==========
import Queue
import os
def walker():
filelist = Queue.Queue()
queue_end = Object()
for root, dirs, files in os.walk('/'):
for f in files:
path = os.path.join(root,f)
if not os.path.islink(path):
filelist.put(path)
filelist.put(queue_end)
fileB.py
===========
import fileA
import os
import multiprocessing as mp
def processor(queuelock):
while True:
with queuelock:
filepath = fileA.filelist.get()
if filepath is fileA.queue_end:
filelist.put(queue_end)
break
#example of a job
os.move(filepath, "/home/newuser" + filepath)
print filepath + " has been moved!"
if __name__ == '__main__':
fileA.walker()
queuelock = mp.Lock()
jobs = []
for i in range(0,mp.cpu_count()):
process = mp.Process(target=processor(queuelock))
jobs.append(process)
process.start()
问题是在移动文件时,所有进程都会尝试移动完全相同的文件,即使它应该已从队列中删除。
示例输出:
randomFile as been moved!
Error: ......... randomFile not found
Error: ......... randomFile not found
Error: ......... randomFile not found
这意味着每个生成的进程都将完全相同的文件从队列中取出,并试图对同一文件执行相同的进程。
问题:我是否做错了什么,出于某种原因,filelist
队列已被发送到每个进程(现在正在发生的事情),而不是 filelist
队列由所有进程共享(我的预期结果)?
filelist
目前只是walker()
的局部变量,并且队列对象不与代码的其他部分共享,因此至少 [=12=walker()
. 中需要 ]
要在多个进程之间共享同一个队列,需要
multiprocessing.Queue
。queue.Queue
在一个进程被分叉(或生成)时被复制,因此它成为每个进程的一个新的独立队列。