Python multiprocessing.Queue 与 multiprocessing.manager().Queue()
Python multiprocessing.Queue vs multiprocessing.manager().Queue()
我有一个像这样的简单任务:
def worker(queue):
while True:
try:
_ = queue.get_nowait()
except Queue.Empty:
break
if __name__ == '__main__':
manager = multiprocessing.Manager()
# queue = multiprocessing.Queue()
queue = manager.Queue()
for i in range(5):
queue.put(i)
processes = []
for i in range(2):
proc = multiprocessing.Process(target=worker, args=(queue,))
processes.append(proc)
proc.start()
for proc in processes:
proc.join()
似乎 multiprocessing.Queue 可以完成我需要的所有工作,但另一方面,我看到了很多 manager().Queue() 的例子,无法理解我真正需要的是什么。看起来 Manager().Queue() 使用某种代理对象,但我不明白这些目的,因为 multiprocessing.Queue() 在没有任何代理对象的情况下做同样的工作。
那么,我的问题是:
1) multiprocessing.Queue 和 multiprocessing.manager().Queue() 返回的对象之间的真正区别是什么?
2) 我需要使用什么?
虽然我对这个主题的理解有限,但根据我所做的,我可以看出 multiprocessing.Queue() 和 multiprocessing.Manager() 之间的一个主要区别。Queue():
- multiprocessing.Queue() 是一个对象,而 multiprocessing.Manager().Queue() 是指向由 multiprocessing.Manager() 对象管理的共享队列的地址(代理)。
- 因此您不能将正常的 multiprocessing.Queue() 对象传递给 Pool 方法,因为它不能被 pickle。
- 此外 python doc 告诉我们在使用 multiprocessing.Queue() 时要特别注意,因为它可能会产生不良影响
Note When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager.
After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising Queue.Empty.
If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.
Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
Note that a queue created using a manager does not have this issue.
通过将队列设置为全局变量并在初始化时为所有进程设置它,可以将 multiprocessing.Queue() 与 Pool 一起使用:
queue = multiprocessing.Queue()
def initialize_shared(q):
global queue
queue=q
pool= Pool(nb_process,initializer=initialize_shared, initargs(queue,))
将创建具有正确共享队列的池进程,但我们可以争辩说 multiprocessing.Queue() 对象不是为此用途创建的。
另一方面,manager.Queue() 可以通过将其作为函数的正常参数传递来在池子进程之间共享。
在我看来,在任何情况下使用 multiprocessing.Manager().Queue() 都很好,而且不那么麻烦。使用管理器可能会有一些缺点,但我不知道。
我最近遇到了 Manager().Queue()
的问题,当 SyncManager
对象 - return 由 multiprocessing.Manager()
编辑 - 似乎死了,它管理的队列阻塞永远(即使 *_nowait()
)。
我不确定原因,或者如果 SyncManager 真的死了,我唯一的线索是我从一个 class 实例调用 multiprocessing.Manager()
,它有 __del__()
,它记录调用它的进程,我可以看到这是从 SyncManager 进程调用的 __del__()
。
这意味着我的对象在 SyncManager 进程中有一个副本,并且已被垃圾回收。这可能意味着只有我的对象被删除,SyncManager 没问题,但我确实看到相应的队列变得无响应与 SyncManager 进程中的 __del__()
调用相关。
我不知道我的对象如何在 SyncManager 进程中结束。我通常抽出 50-200 名经理——有些有重叠的生命周期,有些则没有——直到我看到这个问题。对于解释器退出时存在的对象,__del__()
不会被调用,而且我通常不会看到 SyncManager 对象被 __del__()
中的这个日志杀死,只是偶尔出现。可能当出现问题时,SyncManager 对象首先处理它的对象,然后解释器才会退出,这就是为什么我偶尔会看到 __del__()
调用。
我确实看到我的队列变得无响应,即使在我没有看到从 SyncManager 调用 __del__()
的情况下也是如此。
我也看到了 SyncManager "die",没有引起进一步的问题。
"unresponsive" 我的意思是:
queue.get(timeout=1)
queue.put(timeout=1)
从不return.
queue.get_nowait(timeout=1)
queue.put_nowait(timeout=1)
从不return.
这变得有点复杂,然后我本来想要的,但我把细节放在里面,以防万一它能帮助别人。
我之前用了很长时间Manager().Queue()
没有任何问题。我怀疑要么是实例化了很多管理器对象导致了问题,要么是实例化了很多管理器导致了一个一直存在的问题。
我用Python 3.6.5
.
我有一个像这样的简单任务:
def worker(queue):
while True:
try:
_ = queue.get_nowait()
except Queue.Empty:
break
if __name__ == '__main__':
manager = multiprocessing.Manager()
# queue = multiprocessing.Queue()
queue = manager.Queue()
for i in range(5):
queue.put(i)
processes = []
for i in range(2):
proc = multiprocessing.Process(target=worker, args=(queue,))
processes.append(proc)
proc.start()
for proc in processes:
proc.join()
似乎 multiprocessing.Queue 可以完成我需要的所有工作,但另一方面,我看到了很多 manager().Queue() 的例子,无法理解我真正需要的是什么。看起来 Manager().Queue() 使用某种代理对象,但我不明白这些目的,因为 multiprocessing.Queue() 在没有任何代理对象的情况下做同样的工作。
那么,我的问题是:
1) multiprocessing.Queue 和 multiprocessing.manager().Queue() 返回的对象之间的真正区别是什么?
2) 我需要使用什么?
虽然我对这个主题的理解有限,但根据我所做的,我可以看出 multiprocessing.Queue() 和 multiprocessing.Manager() 之间的一个主要区别。Queue():
- multiprocessing.Queue() 是一个对象,而 multiprocessing.Manager().Queue() 是指向由 multiprocessing.Manager() 对象管理的共享队列的地址(代理)。
- 因此您不能将正常的 multiprocessing.Queue() 对象传递给 Pool 方法,因为它不能被 pickle。
- 此外 python doc 告诉我们在使用 multiprocessing.Queue() 时要特别注意,因为它可能会产生不良影响
Note When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager. After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising Queue.Empty. If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.
Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children. Note that a queue created using a manager does not have this issue.
通过将队列设置为全局变量并在初始化时为所有进程设置它,可以将 multiprocessing.Queue() 与 Pool 一起使用:
queue = multiprocessing.Queue()
def initialize_shared(q):
global queue
queue=q
pool= Pool(nb_process,initializer=initialize_shared, initargs(queue,))
将创建具有正确共享队列的池进程,但我们可以争辩说 multiprocessing.Queue() 对象不是为此用途创建的。
另一方面,manager.Queue() 可以通过将其作为函数的正常参数传递来在池子进程之间共享。
在我看来,在任何情况下使用 multiprocessing.Manager().Queue() 都很好,而且不那么麻烦。使用管理器可能会有一些缺点,但我不知道。
我最近遇到了 Manager().Queue()
的问题,当 SyncManager
对象 - return 由 multiprocessing.Manager()
编辑 - 似乎死了,它管理的队列阻塞永远(即使 *_nowait()
)。
我不确定原因,或者如果 SyncManager 真的死了,我唯一的线索是我从一个 class 实例调用 multiprocessing.Manager()
,它有 __del__()
,它记录调用它的进程,我可以看到这是从 SyncManager 进程调用的 __del__()
。
这意味着我的对象在 SyncManager 进程中有一个副本,并且已被垃圾回收。这可能意味着只有我的对象被删除,SyncManager 没问题,但我确实看到相应的队列变得无响应与 SyncManager 进程中的 __del__()
调用相关。
我不知道我的对象如何在 SyncManager 进程中结束。我通常抽出 50-200 名经理——有些有重叠的生命周期,有些则没有——直到我看到这个问题。对于解释器退出时存在的对象,__del__()
不会被调用,而且我通常不会看到 SyncManager 对象被 __del__()
中的这个日志杀死,只是偶尔出现。可能当出现问题时,SyncManager 对象首先处理它的对象,然后解释器才会退出,这就是为什么我偶尔会看到 __del__()
调用。
我确实看到我的队列变得无响应,即使在我没有看到从 SyncManager 调用 __del__()
的情况下也是如此。
我也看到了 SyncManager "die",没有引起进一步的问题。
"unresponsive" 我的意思是:
queue.get(timeout=1)
queue.put(timeout=1)
从不return.
queue.get_nowait(timeout=1)
queue.put_nowait(timeout=1)
从不return.
这变得有点复杂,然后我本来想要的,但我把细节放在里面,以防万一它能帮助别人。
我之前用了很长时间Manager().Queue()
没有任何问题。我怀疑要么是实例化了很多管理器对象导致了问题,要么是实例化了很多管理器导致了一个一直存在的问题。
我用Python 3.6.5
.