python 的 multiprocessing.Queue 的可迭代实现中的继承

Inheritance in iterable implementation of python's multiprocessing.Queue

我发现缺少 python 的 multiprocessing.Queue 的默认实现,因为它不像任何其他集合那样可迭代。所以我开始努力创建它的 'subclass',添加功能。正如您从下面的代码中看到的,它不是一个合适的子 class,因为 multiprocess.Queue 不是不是一个直接的 class 本身,而是一个工厂函数,真正的底层 class 是 multiprocess.queues.Queue。我没有理解也没有努力去模仿工厂功能,所以我可以正确地从 class 继承,所以我只是让新的 class 从工厂并将其视为超级class。这是代码;

from multiprocessing import Queue, Value, Lock
import queue

class QueueClosed(Exception):
    pass

class IterableQueue:
    def __init__(self, maxsize=0):
        self.closed = Value('b', False)
        self.close_lock = Lock()
        self.queue = Queue(maxsize)

    def close(self):
        with self.close_lock:
            self.closed.value = True
            self.queue.close()

    def put(self, elem, block=True, timeout=None):
        with self.close_lock:
            if self.closed.value:
                raise QueueClosed()
            else:
                self.queue.put(elem, block, timeout)

    def put_nowait(self, elem):
        self.put(elem, False)

    def get(self, block=True):
        if not block:
            return self.queue.get_nowait()
        elif self.closed.value:
            try:
                return self.queue.get_nowait()
            except queue.Empty:
                return None
        else:
            val = None
            while not self.closed.value:
                try:
                    val = self.queue.get_nowait()
                    break
                except queue.Empty:
                    pass
            return val

    def get_nowait(self):
        return self.queue.get_nowait()

    def join_thread(self):
        return self.queue.join_thread()

    def __iter__(self):
        return self

    def __next__(self):
        val = self.get()
        if val == None:
            raise StopIteration()
        else:
            return val

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()

这使我可以像正常 multiprocessing.Queue 一样实例化一个 IterableQueue 对象,像正常一样将元素放入其中,然后在子消费者内部,像这样简单地循环它;

from iterable_queue import IterableQueue
from multiprocessing import Process, cpu_count
import os

def fib(n):
    if n < 2:
        return n
    return fib(n-1) + fib(n-2)

def consumer(queue):
    print(f"[{os.getpid()}] Consuming")
    for i in queue:
        print(f"[{os.getpid()}] < {i}")
        n = fib(i)
        print(f"[{os.getpid()}] {i} > {n}")
    print(f"[{os.getpid()}] Closing")

def producer():
    print("Enqueueing")
    with IterableQueue() as queue:
        procs = [Process(target=consumer, args=(queue,)) for _ in range(cpu_count())]
        [p.start() for p in procs]
        [queue.put(i) for i in range(36)]
    print("Finished")

if __name__ == "__main__":
    producer()

它几乎可以无缝运行;一旦队列关闭,消费者就退出循环,但只有在耗尽所有剩余元素之后。但是,我对缺少继承方法感到不满意。为了模仿实际的继承行为,我尝试将以下元函数调用添加到 class;

def __getattr__(self, name):
    if name in self.__dict__:
        return self.__dict__[name]
    else:
        return self.queue.__getattr__[name]

但是,当 IterableQueue class 的实例在子 multiprocessing.Process 线程中被操作时,这会失败,因为 class 的 __dict__ 属性 未保留在其中。我试图通过用 multiprocessing.Manager().dict() 替换 class 的默认 __dict__ 来以一种 hacky 的方式解决这个问题,就像这样;

def __init__(self, maxsize=0):
    self.closed = Value('b', False)
    self.close_lock = Lock()
    self.queue = Queue(maxsize)
    self.__dict__ = Manager().dict(self.__dict__)

但是在这样做时,我收到一条错误消息 RuntimeError: Synchronized objects should only be shared between processes through inheritance。所以我的问题是,我应该如何正确地从 Queue class 继承,以便 subclass 继承对其所有属性的访问权?此外,当队列为空但未关闭时,消费者都处于繁忙的循环中,而不是真正的 IO 块,占用了宝贵的 cpu 资源。如果您对我可能 运行 使用此代码的并发性和竞争条件问题有任何建议,或者我可能会如何解决繁忙的循环问题,我也愿意采纳其中的建议。


基于MisterMiyagi提供的代码,我创建了这个通用的IterableQueue class,它可以接受任意输入,正确阻塞,并且不会在队列关闭时挂起;

from multiprocessing.queues import Queue
from multiprocessing import get_context

class QueueClosed(Exception):
    pass

class IterableQueue(Queue):
    def __init__(self, maxsize=0, *, ctx=None):
        super().__init__(
            maxsize=maxsize,
            ctx=ctx if ctx is not None else get_context()
        )

    def close(self):
        super().put((None, False))
        super().close()

    def __iter__(self):
        return self

    def __next__(self):
        try:
            return self.get()
        except QueueClosed:
            raise StopIteration

    def get(self, *args, **kwargs):
        result, is_open = super().get(*args, **kwargs)
        if not is_open:
            super().put((None, False))
            raise QueueClosed
        return result

    def put(self, val, *args, **kwargs):
        super().put((val, True), *args, **kwargs)

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()

multiprocess.Queue 包装器仅用于 use the default context

def Queue(self, maxsize=0):
    '''Returns a queue object'''
    from .queues import Queue
    return Queue(maxsize, ctx=self.get_context())

继承时,可以在__init__方法中复制这个。这允许您继承整个 Queue 行为。您只需要添加迭代器方法:

from multiprocessing.queues import Queue
from multiprocessing import get_context


class IterableQueue(Queue):
    """
    ``multiprocessing.Queue`` that can be iterated to ``get`` values

    :param sentinel: signal that no more items will be received
    """
    def __init__(self, maxsize=0, *, ctx=None, sentinel=None):
        self.sentinel = sentinel
        super().__init__(
            maxsize=maxsize,
            ctx=ctx if ctx is not None else get_context()
        )

    def close(self):
        self.put(self.sentinel)
        # wait until buffer is flushed...
        while self._buffer:
            time.sleep(0.01)
        # before shutting down the sender
        super().close()

    def __iter__(self):
        return self

    def __next__(self):
        result = self.get()
        if result == self.sentinel:
            # re-queue sentinel for other listeners
            self.put(result)
            raise StopIteration
        return result

请注意,表示队列末尾的 sentinel 是通过相等性进行比较的,因为标识不会跨进程保留。经常使用的 queue.Queue 哨兵 object() 不能正常工作。