如何在脚本运行时向多处理队列添加更多项目

how to add more items to a multiprocessing queue while script in motion

我正在尝试使用队列学习多处理。

我想做的是在脚本运行时找出 when/how 到 "add more items to the queue"。

下面的脚本是我工作的基准:

import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print('Doing something fancy in {} for {}!'.format(
            proc_name, self.name))


def worker(q):
    obj = q.get()
    obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))
    queue.put(MyFancyClass('Frankie'))
    print(queue.qsize())

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

在第 26 行,Fancy Dan 注入有效,但 Frankie 部分无效。我能够确认 Frankie 确实进入了队列。我需要一个可以 "Check for more items" 并根据需要将它们插入队列的位置。如果没有更多项目存在,则在现有项目清除后关闭队列。

我该怎么做?

谢谢!

一种方法是将工作人员更改为

def worker(q):
    while not q.empty():
        obj = q.get()
        obj.do_something()

您的原始代码的问题是工作人员 returns 在处理队列中的一项后。您需要某种循环逻辑。

此解决方案不完善,因为 empty() 不是 reliable。如果在向队列中添加更多项目之前队列变空,也会失败(该过程只会 return)。

我建议使用 Process Pool Executor

Submit 非常接近您要找的内容。

让我们说清楚:

  • 目标函数worker(q)在上述方案中只会被调用一次。在第一次调用时,该函数将暂停等待阻塞操作的结果 q.get()。它从 queue 获取实例 MyFancyClass('Fancy Dan'),调用其 do_something 方法并完成。
  • MyFancyClass('Frankie') 将被放入队列但不会进入进程,因为进程的目标函数已完成。
  • 其中一种方法是从队列中读取并等待 signal/marked 表明队列使用已停止的项目。假设 None 值。

import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print('Doing something fancy in {} for {}!'.format(proc_name, self.name))


def worker(q):
    while True:
        obj = q.get()
        if obj is None:
            break
        obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))
    queue.put(MyFancyClass('Frankie'))
    # print(queue.qsize())
    queue.put(None)

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

输出:

Doing something fancy in Process-1 for Fancy Dan!
Doing something fancy in Process-1 for Frankie!