在不同模块之间共享一个 Queue 实例
Sharing a Queue instance between different modules
我是 Python 的新手,我想在不同模块中创建的 threads/processes 之间创建一个 'global static variable',我的线程安全和进程安全队列。
我从 doc 中读到全局变量的概念是使用第三个模块创建的,我将其称为 cfg,它定义并初始化我的全局队列。
我在我的模块之间共享此对象的实例时遇到问题,因为我试图在从 cfg 模块导入的共享队列上打印 repr() 函数,在导入它的其他模块中, 它表明它们是不同的实例。似乎每次我尝试导入一个模块时,都会创建一个新实例并将其传递给导入它的模块。
Main.py:
import GatewayManager
if __name__ == '__main__':
GatewayManager.initialize()
doSomething()
GatewayManager.py:
import multiprocessing
import queue
import threading
def initialize():
# Multiprocessing or Threading
global isMonoCPU
isMonoCPU = multiprocessing.cpu_count() == 1
global sharedQueue
sharedQueue = multiprocessing.Queue() if not isMonoCPU else queue.Queue()
print("gateway: ", sharedQueue.__repr__())
otherModules.py:
import GatewayManager
# Some module write on the queue
GatewayManager.sharedQueue.put(variable)
# Some read from the queue
GatewayManager.sharedQueue.get()
print("driver: ", GatewayManager.sharedQueue.__repr__())
A multiprocessing.Queue
在创建它的进程(我们称之为 "Parent")和由 Parent 创建的进程(我们称之为 "Children")之间共享。
下面是一些没有这种关系的进程的例子:
$ python myprogram.py &
$ python myprogram.py &
shell就是这两个Children的Parent。 shell 并没有创建 multiprocessing.Queue
,因此它不会被两个 children 共享。相反,他们将各自创建自己的。这可以与他们的children共享,但不能与彼此共享。
您可以很容易地观察到这种行为:
$ cat queuedemo.py
from time import sleep
from os import getpid
from sys import argv
from multiprocessing import Queue
q = Queue()
if argv[1:]:
q.put(getpid())
sleep(60)
else:
print(getpid(), q.get())
exarkun@baryon:/tmp/queue$ python queuedemo.py foo & python queuedemo.py
[1] 28249
第二个进程从未设法从队列中读取任何内容。但是,如果你给两个进程 Parent-Child 关系...
$ cat queuedemo.py
from os import getpid
from multiprocessing import Queue
from multiprocessing.process import Process
q = Queue()
q.put(getpid())
def child():
print(getpid(), q.get())
p = Process(target=child)
p.start()
p.join()
exarkun@baryon:/tmp/queue$ python queuedemo.py
(28469, 28467)
exarkun@baryon:/tmp/queue$
注意 q.get()
调用成功,并且放入队列的 pid 与取出它的进程的 pid 不同。
有些必要,这也扩展到具有 Parent-Descendant 和兄弟关系的进程。
所以:
- 全局变量仅在单个进程中共享
- 多处理模块提供了在彼此正确关联的进程之间共享状态的工具。
如果您想在没有这种关系的进程之间共享状态,还有多种其他选择 - 最好的选择更多地取决于您必须共享哪种状态以及您的共享模式是什么样的(您的问题中都没有包含这两个)。
这里:
# GatewayManager.py:
...
def initialize():
global sharedQueue
# ...
sharedQueue = multiprocessing.Queue()
# ...
您的 GatewayManager
模块在调用 initialize()
函数之前没有 sharedQueue
属性。因此,如果任何其他模块试图在 GatewayManager.initialize()
被调用,那么你当然会得到这个错误。由于 GatewayManager.initialize()
会在每次调用时盲目地重新绑定 sharedQueue
,如果您从另一个模块再次调用它,那么您将丢失已经创建的队列并获得一个新队列。
你想要的是确保你的 sharedqueue 只被创建一次,并且无论发生什么都会被创建。这里的解决方案(好吧,至少是一个解决方案 - 但它是一个已知的工作解决方案)是通过函数代理所有 GatewayManager.sharedQueue.whatever
访问,这些函数将在需要时负责初始化队列。
# gateway_manager.py
class _QueueProxy(object):
def __init__(self):
self._queueimp = None
@property
def _queue(self):
if self._queueimp is None:
isMonoCPU = multiprocessing.cpu_count() == 1
self._queueimp = queue.Queue() if isMonoCPU else multiprocessing.Queue()
return self._queueimp
def get(self, *args, **kw):
return self._queue.get(*args, **kw)
def put(self, *args, **kw):
return self._queue.put(*args, **kw)
# etc... only expose public methods and attributes of course
# and now our `shared_queue` instance
shared_queue = _QueueProxy()
现在您可以安全地(好吧几乎 - 队列创建不是原子的,所以您可能有竞争条件)从任何模块使用 gateway_manager.shared_queue
而无需关心初始化。
当然,如果您有两个不同的进程(我在这里不是在谈论 multiprocessing.Process
),您仍然会有两个不同的队列,但我假设您已经理解了这一点(如果不理解,请阅读 Jean-Paul 的回答)。
我是 Python 的新手,我想在不同模块中创建的 threads/processes 之间创建一个 'global static variable',我的线程安全和进程安全队列。 我从 doc 中读到全局变量的概念是使用第三个模块创建的,我将其称为 cfg,它定义并初始化我的全局队列。 我在我的模块之间共享此对象的实例时遇到问题,因为我试图在从 cfg 模块导入的共享队列上打印 repr() 函数,在导入它的其他模块中, 它表明它们是不同的实例。似乎每次我尝试导入一个模块时,都会创建一个新实例并将其传递给导入它的模块。
Main.py:
import GatewayManager
if __name__ == '__main__':
GatewayManager.initialize()
doSomething()
GatewayManager.py:
import multiprocessing
import queue
import threading
def initialize():
# Multiprocessing or Threading
global isMonoCPU
isMonoCPU = multiprocessing.cpu_count() == 1
global sharedQueue
sharedQueue = multiprocessing.Queue() if not isMonoCPU else queue.Queue()
print("gateway: ", sharedQueue.__repr__())
otherModules.py:
import GatewayManager
# Some module write on the queue
GatewayManager.sharedQueue.put(variable)
# Some read from the queue
GatewayManager.sharedQueue.get()
print("driver: ", GatewayManager.sharedQueue.__repr__())
A multiprocessing.Queue
在创建它的进程(我们称之为 "Parent")和由 Parent 创建的进程(我们称之为 "Children")之间共享。
下面是一些没有这种关系的进程的例子:
$ python myprogram.py &
$ python myprogram.py &
shell就是这两个Children的Parent。 shell 并没有创建 multiprocessing.Queue
,因此它不会被两个 children 共享。相反,他们将各自创建自己的。这可以与他们的children共享,但不能与彼此共享。
您可以很容易地观察到这种行为:
$ cat queuedemo.py
from time import sleep
from os import getpid
from sys import argv
from multiprocessing import Queue
q = Queue()
if argv[1:]:
q.put(getpid())
sleep(60)
else:
print(getpid(), q.get())
exarkun@baryon:/tmp/queue$ python queuedemo.py foo & python queuedemo.py
[1] 28249
第二个进程从未设法从队列中读取任何内容。但是,如果你给两个进程 Parent-Child 关系...
$ cat queuedemo.py
from os import getpid
from multiprocessing import Queue
from multiprocessing.process import Process
q = Queue()
q.put(getpid())
def child():
print(getpid(), q.get())
p = Process(target=child)
p.start()
p.join()
exarkun@baryon:/tmp/queue$ python queuedemo.py
(28469, 28467)
exarkun@baryon:/tmp/queue$
注意 q.get()
调用成功,并且放入队列的 pid 与取出它的进程的 pid 不同。
有些必要,这也扩展到具有 Parent-Descendant 和兄弟关系的进程。
所以:
- 全局变量仅在单个进程中共享
- 多处理模块提供了在彼此正确关联的进程之间共享状态的工具。
如果您想在没有这种关系的进程之间共享状态,还有多种其他选择 - 最好的选择更多地取决于您必须共享哪种状态以及您的共享模式是什么样的(您的问题中都没有包含这两个)。
这里:
# GatewayManager.py:
...
def initialize():
global sharedQueue
# ...
sharedQueue = multiprocessing.Queue()
# ...
您的 GatewayManager
模块在调用 initialize()
函数之前没有 sharedQueue
属性。因此,如果任何其他模块试图在 GatewayManager.initialize()
被调用,那么你当然会得到这个错误。由于 GatewayManager.initialize()
会在每次调用时盲目地重新绑定 sharedQueue
,如果您从另一个模块再次调用它,那么您将丢失已经创建的队列并获得一个新队列。
你想要的是确保你的 sharedqueue 只被创建一次,并且无论发生什么都会被创建。这里的解决方案(好吧,至少是一个解决方案 - 但它是一个已知的工作解决方案)是通过函数代理所有 GatewayManager.sharedQueue.whatever
访问,这些函数将在需要时负责初始化队列。
# gateway_manager.py
class _QueueProxy(object):
def __init__(self):
self._queueimp = None
@property
def _queue(self):
if self._queueimp is None:
isMonoCPU = multiprocessing.cpu_count() == 1
self._queueimp = queue.Queue() if isMonoCPU else multiprocessing.Queue()
return self._queueimp
def get(self, *args, **kw):
return self._queue.get(*args, **kw)
def put(self, *args, **kw):
return self._queue.put(*args, **kw)
# etc... only expose public methods and attributes of course
# and now our `shared_queue` instance
shared_queue = _QueueProxy()
现在您可以安全地(好吧几乎 - 队列创建不是原子的,所以您可能有竞争条件)从任何模块使用 gateway_manager.shared_queue
而无需关心初始化。
当然,如果您有两个不同的进程(我在这里不是在谈论 multiprocessing.Process
),您仍然会有两个不同的队列,但我假设您已经理解了这一点(如果不理解,请阅读 Jean-Paul 的回答)。