当所有进程都试图从队列中获取并且队列为空时结束处理?
End processing when all processes are trying to get from the queue and the queue is empty?
我想设置一些进程来接收输入并处理它,而这个结果的结果是我想要处理的另一项任务。基本上每个任务都会产生零个或多个新任务(相同类型),最终所有任务都不会产生新任务。
我认为队列对此有好处,所以我有一个输入队列和一个结果队列来添加不会产生新内容的任务。在任何时候,队列都可能是空的,但如果另一个进程正在处理任务,则可以添加更多队列。
因此,我只希望它在所有进程同时尝试从输入队列中获取时结束。
我对 python 多处理和一般多处理都是全新的。
编辑以添加我的意思的基本概述:
class Consumer(Process):
def __init__(self, name):
super().__init__(name=name)
def run():
# This is where I would have the task try to get a new task off of the
# queue and then calculate the results and put them into the queue
# After which it would then try to get a new task and repeat
# If this an all other processes are trying to get and the queue is
# empty That is the only time I know that everything is complete and can
# continue
pass
def start_processing():
in_queue = Queue()
results_queue = Queue()
consumers = [Consumer(str(i)) for i in range(cpu_count())]
for i in consumers:
i.start()
# Wait for the above mentioned conditions to be true before continuing
JoinableQueue
的设计就是为了满足这个目的。加入 JoinableQueue
将阻塞,直到有任务在进行中。
您可以按如下方式使用它:主进程将生成一定数量的工作进程,并为它们分配 JoinableQueue
。工作进程将使用队列来生产和消费新任务。主进程将通过加入队列等待,直到没有更多任务在进行中。之后,它会终止工作进程并退出。
一个非常简单的例子(伪代码):
def consumer(queue):
for task in queue.get():
results = process_task(task)
if 'more_tasks' in results:
for new_task in results['more_tasks']:
queue.put(new_task)
# signal the queue that a task has been completed
queue.task_done()
def main():
queue = JoinableQueue()
processes = start_processes(consumer, queue)
for task in initial_tasks:
queue.put(task)
queue.join() # block until all work is done
terminate_processes(processes)
我想设置一些进程来接收输入并处理它,而这个结果的结果是我想要处理的另一项任务。基本上每个任务都会产生零个或多个新任务(相同类型),最终所有任务都不会产生新任务。
我认为队列对此有好处,所以我有一个输入队列和一个结果队列来添加不会产生新内容的任务。在任何时候,队列都可能是空的,但如果另一个进程正在处理任务,则可以添加更多队列。
因此,我只希望它在所有进程同时尝试从输入队列中获取时结束。
我对 python 多处理和一般多处理都是全新的。
编辑以添加我的意思的基本概述:
class Consumer(Process):
def __init__(self, name):
super().__init__(name=name)
def run():
# This is where I would have the task try to get a new task off of the
# queue and then calculate the results and put them into the queue
# After which it would then try to get a new task and repeat
# If this an all other processes are trying to get and the queue is
# empty That is the only time I know that everything is complete and can
# continue
pass
def start_processing():
in_queue = Queue()
results_queue = Queue()
consumers = [Consumer(str(i)) for i in range(cpu_count())]
for i in consumers:
i.start()
# Wait for the above mentioned conditions to be true before continuing
JoinableQueue
的设计就是为了满足这个目的。加入 JoinableQueue
将阻塞,直到有任务在进行中。
您可以按如下方式使用它:主进程将生成一定数量的工作进程,并为它们分配 JoinableQueue
。工作进程将使用队列来生产和消费新任务。主进程将通过加入队列等待,直到没有更多任务在进行中。之后,它会终止工作进程并退出。
一个非常简单的例子(伪代码):
def consumer(queue):
for task in queue.get():
results = process_task(task)
if 'more_tasks' in results:
for new_task in results['more_tasks']:
queue.put(new_task)
# signal the queue that a task has been completed
queue.task_done()
def main():
queue = JoinableQueue()
processes = start_processes(consumer, queue)
for task in initial_tasks:
queue.put(task)
queue.join() # block until all work is done
terminate_processes(processes)