python 的 multiprocessing.Queue 的可迭代实现中的继承
Inheritance in iterable implementation of python's multiprocessing.Queue
我发现缺少 python 的 multiprocessing.Queue
的默认实现,因为它不像任何其他集合那样可迭代。所以我开始努力创建它的 'subclass',添加功能。正如您从下面的代码中看到的,它不是一个合适的子 class,因为 multiprocess.Queue
不是不是一个直接的 class 本身,而是一个工厂函数,真正的底层 class 是 multiprocess.queues.Queue
。我没有理解也没有努力去模仿工厂功能,所以我可以正确地从 class 继承,所以我只是让新的 class 从工厂并将其视为超级class。这是代码;
from multiprocessing import Queue, Value, Lock
import queue
class QueueClosed(Exception):
pass
class IterableQueue:
def __init__(self, maxsize=0):
self.closed = Value('b', False)
self.close_lock = Lock()
self.queue = Queue(maxsize)
def close(self):
with self.close_lock:
self.closed.value = True
self.queue.close()
def put(self, elem, block=True, timeout=None):
with self.close_lock:
if self.closed.value:
raise QueueClosed()
else:
self.queue.put(elem, block, timeout)
def put_nowait(self, elem):
self.put(elem, False)
def get(self, block=True):
if not block:
return self.queue.get_nowait()
elif self.closed.value:
try:
return self.queue.get_nowait()
except queue.Empty:
return None
else:
val = None
while not self.closed.value:
try:
val = self.queue.get_nowait()
break
except queue.Empty:
pass
return val
def get_nowait(self):
return self.queue.get_nowait()
def join_thread(self):
return self.queue.join_thread()
def __iter__(self):
return self
def __next__(self):
val = self.get()
if val == None:
raise StopIteration()
else:
return val
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
这使我可以像正常 multiprocessing.Queue
一样实例化一个 IterableQueue
对象,像正常一样将元素放入其中,然后在子消费者内部,像这样简单地循环它;
from iterable_queue import IterableQueue
from multiprocessing import Process, cpu_count
import os
def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2)
def consumer(queue):
print(f"[{os.getpid()}] Consuming")
for i in queue:
print(f"[{os.getpid()}] < {i}")
n = fib(i)
print(f"[{os.getpid()}] {i} > {n}")
print(f"[{os.getpid()}] Closing")
def producer():
print("Enqueueing")
with IterableQueue() as queue:
procs = [Process(target=consumer, args=(queue,)) for _ in range(cpu_count())]
[p.start() for p in procs]
[queue.put(i) for i in range(36)]
print("Finished")
if __name__ == "__main__":
producer()
它几乎可以无缝运行;一旦队列关闭,消费者就退出循环,但只有在耗尽所有剩余元素之后。但是,我对缺少继承方法感到不满意。为了模仿实际的继承行为,我尝试将以下元函数调用添加到 class;
def __getattr__(self, name):
if name in self.__dict__:
return self.__dict__[name]
else:
return self.queue.__getattr__[name]
但是,当 IterableQueue
class 的实例在子 multiprocessing.Process
线程中被操作时,这会失败,因为 class 的 __dict__
属性 未保留在其中。我试图通过用 multiprocessing.Manager().dict()
替换 class 的默认 __dict__
来以一种 hacky 的方式解决这个问题,就像这样;
def __init__(self, maxsize=0):
self.closed = Value('b', False)
self.close_lock = Lock()
self.queue = Queue(maxsize)
self.__dict__ = Manager().dict(self.__dict__)
但是在这样做时,我收到一条错误消息 RuntimeError: Synchronized objects should only be shared between processes through inheritance
。所以我的问题是,我应该如何正确地从 Queue class 继承,以便 subclass 继承对其所有属性的访问权?此外,当队列为空但未关闭时,消费者都处于繁忙的循环中,而不是真正的 IO 块,占用了宝贵的 cpu 资源。如果您对我可能 运行 使用此代码的并发性和竞争条件问题有任何建议,或者我可能会如何解决繁忙的循环问题,我也愿意采纳其中的建议。
基于MisterMiyagi提供的代码,我创建了这个通用的IterableQueue
class,它可以接受任意输入,正确阻塞,并且不会在队列关闭时挂起;
from multiprocessing.queues import Queue
from multiprocessing import get_context
class QueueClosed(Exception):
pass
class IterableQueue(Queue):
def __init__(self, maxsize=0, *, ctx=None):
super().__init__(
maxsize=maxsize,
ctx=ctx if ctx is not None else get_context()
)
def close(self):
super().put((None, False))
super().close()
def __iter__(self):
return self
def __next__(self):
try:
return self.get()
except QueueClosed:
raise StopIteration
def get(self, *args, **kwargs):
result, is_open = super().get(*args, **kwargs)
if not is_open:
super().put((None, False))
raise QueueClosed
return result
def put(self, val, *args, **kwargs):
super().put((val, True), *args, **kwargs)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
multiprocess.Queue
包装器仅用于 use the default context。
def Queue(self, maxsize=0):
'''Returns a queue object'''
from .queues import Queue
return Queue(maxsize, ctx=self.get_context())
继承时,可以在__init__
方法中复制这个。这允许您继承整个 Queue
行为。您只需要添加迭代器方法:
from multiprocessing.queues import Queue
from multiprocessing import get_context
class IterableQueue(Queue):
"""
``multiprocessing.Queue`` that can be iterated to ``get`` values
:param sentinel: signal that no more items will be received
"""
def __init__(self, maxsize=0, *, ctx=None, sentinel=None):
self.sentinel = sentinel
super().__init__(
maxsize=maxsize,
ctx=ctx if ctx is not None else get_context()
)
def close(self):
self.put(self.sentinel)
# wait until buffer is flushed...
while self._buffer:
time.sleep(0.01)
# before shutting down the sender
super().close()
def __iter__(self):
return self
def __next__(self):
result = self.get()
if result == self.sentinel:
# re-queue sentinel for other listeners
self.put(result)
raise StopIteration
return result
请注意,表示队列末尾的 sentinel
是通过相等性进行比较的,因为标识不会跨进程保留。经常使用的 queue.Queue
哨兵 object()
不能正常工作。
我发现缺少 python 的 multiprocessing.Queue
的默认实现,因为它不像任何其他集合那样可迭代。所以我开始努力创建它的 'subclass',添加功能。正如您从下面的代码中看到的,它不是一个合适的子 class,因为 multiprocess.Queue
不是不是一个直接的 class 本身,而是一个工厂函数,真正的底层 class 是 multiprocess.queues.Queue
。我没有理解也没有努力去模仿工厂功能,所以我可以正确地从 class 继承,所以我只是让新的 class 从工厂并将其视为超级class。这是代码;
from multiprocessing import Queue, Value, Lock
import queue
class QueueClosed(Exception):
pass
class IterableQueue:
def __init__(self, maxsize=0):
self.closed = Value('b', False)
self.close_lock = Lock()
self.queue = Queue(maxsize)
def close(self):
with self.close_lock:
self.closed.value = True
self.queue.close()
def put(self, elem, block=True, timeout=None):
with self.close_lock:
if self.closed.value:
raise QueueClosed()
else:
self.queue.put(elem, block, timeout)
def put_nowait(self, elem):
self.put(elem, False)
def get(self, block=True):
if not block:
return self.queue.get_nowait()
elif self.closed.value:
try:
return self.queue.get_nowait()
except queue.Empty:
return None
else:
val = None
while not self.closed.value:
try:
val = self.queue.get_nowait()
break
except queue.Empty:
pass
return val
def get_nowait(self):
return self.queue.get_nowait()
def join_thread(self):
return self.queue.join_thread()
def __iter__(self):
return self
def __next__(self):
val = self.get()
if val == None:
raise StopIteration()
else:
return val
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
这使我可以像正常 multiprocessing.Queue
一样实例化一个 IterableQueue
对象,像正常一样将元素放入其中,然后在子消费者内部,像这样简单地循环它;
from iterable_queue import IterableQueue
from multiprocessing import Process, cpu_count
import os
def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2)
def consumer(queue):
print(f"[{os.getpid()}] Consuming")
for i in queue:
print(f"[{os.getpid()}] < {i}")
n = fib(i)
print(f"[{os.getpid()}] {i} > {n}")
print(f"[{os.getpid()}] Closing")
def producer():
print("Enqueueing")
with IterableQueue() as queue:
procs = [Process(target=consumer, args=(queue,)) for _ in range(cpu_count())]
[p.start() for p in procs]
[queue.put(i) for i in range(36)]
print("Finished")
if __name__ == "__main__":
producer()
它几乎可以无缝运行;一旦队列关闭,消费者就退出循环,但只有在耗尽所有剩余元素之后。但是,我对缺少继承方法感到不满意。为了模仿实际的继承行为,我尝试将以下元函数调用添加到 class;
def __getattr__(self, name):
if name in self.__dict__:
return self.__dict__[name]
else:
return self.queue.__getattr__[name]
但是,当 IterableQueue
class 的实例在子 multiprocessing.Process
线程中被操作时,这会失败,因为 class 的 __dict__
属性 未保留在其中。我试图通过用 multiprocessing.Manager().dict()
替换 class 的默认 __dict__
来以一种 hacky 的方式解决这个问题,就像这样;
def __init__(self, maxsize=0):
self.closed = Value('b', False)
self.close_lock = Lock()
self.queue = Queue(maxsize)
self.__dict__ = Manager().dict(self.__dict__)
但是在这样做时,我收到一条错误消息 RuntimeError: Synchronized objects should only be shared between processes through inheritance
。所以我的问题是,我应该如何正确地从 Queue class 继承,以便 subclass 继承对其所有属性的访问权?此外,当队列为空但未关闭时,消费者都处于繁忙的循环中,而不是真正的 IO 块,占用了宝贵的 cpu 资源。如果您对我可能 运行 使用此代码的并发性和竞争条件问题有任何建议,或者我可能会如何解决繁忙的循环问题,我也愿意采纳其中的建议。
基于MisterMiyagi提供的代码,我创建了这个通用的IterableQueue
class,它可以接受任意输入,正确阻塞,并且不会在队列关闭时挂起;
from multiprocessing.queues import Queue
from multiprocessing import get_context
class QueueClosed(Exception):
pass
class IterableQueue(Queue):
def __init__(self, maxsize=0, *, ctx=None):
super().__init__(
maxsize=maxsize,
ctx=ctx if ctx is not None else get_context()
)
def close(self):
super().put((None, False))
super().close()
def __iter__(self):
return self
def __next__(self):
try:
return self.get()
except QueueClosed:
raise StopIteration
def get(self, *args, **kwargs):
result, is_open = super().get(*args, **kwargs)
if not is_open:
super().put((None, False))
raise QueueClosed
return result
def put(self, val, *args, **kwargs):
super().put((val, True), *args, **kwargs)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
multiprocess.Queue
包装器仅用于 use the default context。
def Queue(self, maxsize=0):
'''Returns a queue object'''
from .queues import Queue
return Queue(maxsize, ctx=self.get_context())
继承时,可以在__init__
方法中复制这个。这允许您继承整个 Queue
行为。您只需要添加迭代器方法:
from multiprocessing.queues import Queue
from multiprocessing import get_context
class IterableQueue(Queue):
"""
``multiprocessing.Queue`` that can be iterated to ``get`` values
:param sentinel: signal that no more items will be received
"""
def __init__(self, maxsize=0, *, ctx=None, sentinel=None):
self.sentinel = sentinel
super().__init__(
maxsize=maxsize,
ctx=ctx if ctx is not None else get_context()
)
def close(self):
self.put(self.sentinel)
# wait until buffer is flushed...
while self._buffer:
time.sleep(0.01)
# before shutting down the sender
super().close()
def __iter__(self):
return self
def __next__(self):
result = self.get()
if result == self.sentinel:
# re-queue sentinel for other listeners
self.put(result)
raise StopIteration
return result
请注意,表示队列末尾的 sentinel
是通过相等性进行比较的,因为标识不会跨进程保留。经常使用的 queue.Queue
哨兵 object()
不能正常工作。