如何在脚本运行时向多处理队列添加更多项目
how to add more items to a multiprocessing queue while script in motion
我正在尝试使用队列学习多处理。
我想做的是在脚本运行时找出 when/how 到 "add more items to the queue"。
下面的脚本是我工作的基准:
import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print('Doing something fancy in {} for {}!'.format(
proc_name, self.name))
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
queue.put(MyFancyClass('Frankie'))
print(queue.qsize())
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
在第 26 行,Fancy Dan
注入有效,但 Frankie
部分无效。我能够确认 Frankie
确实进入了队列。我需要一个可以 "Check for more items" 并根据需要将它们插入队列的位置。如果没有更多项目存在,则在现有项目清除后关闭队列。
我该怎么做?
谢谢!
一种方法是将工作人员更改为
def worker(q):
while not q.empty():
obj = q.get()
obj.do_something()
您的原始代码的问题是工作人员 returns 在处理队列中的一项后。您需要某种循环逻辑。
此解决方案不完善,因为 empty() 不是 reliable。如果在向队列中添加更多项目之前队列变空,也会失败(该过程只会 return)。
我建议使用 Process Pool Executor。
Submit 非常接近您要找的内容。
让我们说清楚:
- 目标函数
worker(q)
在上述方案中只会被调用一次。在第一次调用时,该函数将暂停等待阻塞操作的结果 q.get()
。它从 queue
获取实例 MyFancyClass('Fancy Dan')
,调用其 do_something
方法并完成。
MyFancyClass('Frankie')
将被放入队列但不会进入进程,因为进程的目标函数已完成。
- 其中一种方法是从队列中读取并等待 signal/marked 表明队列使用已停止的项目。假设
None
值。
import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print('Doing something fancy in {} for {}!'.format(proc_name, self.name))
def worker(q):
while True:
obj = q.get()
if obj is None:
break
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
queue.put(MyFancyClass('Frankie'))
# print(queue.qsize())
queue.put(None)
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
输出:
Doing something fancy in Process-1 for Fancy Dan!
Doing something fancy in Process-1 for Frankie!
我正在尝试使用队列学习多处理。
我想做的是在脚本运行时找出 when/how 到 "add more items to the queue"。
下面的脚本是我工作的基准:
import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print('Doing something fancy in {} for {}!'.format(
proc_name, self.name))
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
queue.put(MyFancyClass('Frankie'))
print(queue.qsize())
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
在第 26 行,Fancy Dan
注入有效,但 Frankie
部分无效。我能够确认 Frankie
确实进入了队列。我需要一个可以 "Check for more items" 并根据需要将它们插入队列的位置。如果没有更多项目存在,则在现有项目清除后关闭队列。
我该怎么做?
谢谢!
一种方法是将工作人员更改为
def worker(q):
while not q.empty():
obj = q.get()
obj.do_something()
您的原始代码的问题是工作人员 returns 在处理队列中的一项后。您需要某种循环逻辑。
此解决方案不完善,因为 empty() 不是 reliable。如果在向队列中添加更多项目之前队列变空,也会失败(该过程只会 return)。
我建议使用 Process Pool Executor。
Submit 非常接近您要找的内容。
让我们说清楚:
- 目标函数
worker(q)
在上述方案中只会被调用一次。在第一次调用时,该函数将暂停等待阻塞操作的结果q.get()
。它从queue
获取实例MyFancyClass('Fancy Dan')
,调用其do_something
方法并完成。 MyFancyClass('Frankie')
将被放入队列但不会进入进程,因为进程的目标函数已完成。- 其中一种方法是从队列中读取并等待 signal/marked 表明队列使用已停止的项目。假设
None
值。
import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print('Doing something fancy in {} for {}!'.format(proc_name, self.name))
def worker(q):
while True:
obj = q.get()
if obj is None:
break
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
queue.put(MyFancyClass('Frankie'))
# print(queue.qsize())
queue.put(None)
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
输出:
Doing something fancy in Process-1 for Fancy Dan!
Doing something fancy in Process-1 for Frankie!