从父进程访问 Python Multiprocessing.Process 子类的状态
Accessing the state of a Python Multiprocessing.Process subclass from the parent process
我正在创建一个简单的 TCP 服务器作为存根,这样我就可以测试一个运行测试设备的脚本,而不必在那里安装设备。服务器应该坐在那里等待连接,然后维护和更新状态变量(只是 6 个整数的列表)以响应它接收到的命令。然后父进程(例如单元测试 class)应该能够随时查询状态。
服务器的界面应该很简单:
server = StubServer()
server.start()
'''
the client script connects with the server and
some stuff happens to change the state
'''
newState = server.getState() # newState = [93,93,93,3,3,45] for example
server.terminate()
我已经 subclassed Multiprocessing.Process 执行此操作,我可以毫无问题地启动服务器。当我第一次对此进行测试时,在 getState() 方法中我只是返回了实例变量 _state 但我发现这始终只是初始状态。经过一番挖掘后,我找不到任何类似的例子。很多关于 subclassing 过程,但不是这个具体问题。最终,我将下面的代码放在一起,它使用内部 Queue() 来存储状态,但这对我来说看起来很混乱和笨重。有更好的方法吗?
import socket
from multiprocessing import Process, Queue
class StubServer(Process):
_port = 4001
_addr = '' # all addresses 0.0.0.0
_sock = None
_state = []
_queue = None
def __init__(self, initState=[93,93,93,93,93,93]):
super(StubServer, self).__init__()
self._queue = Queue()
self._state = initState
def run(self):
# Put the state into the queue
self._queue.put(self._state)
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.bind((self._addr, self._port))
self._sock.listen(1)
waitingForConnection = True
'''
Main loop will continue until the connection is terminated. if a connection is closed, the loop returns
to the start and waits for a new connection. This means multiple tests can be run with the same server
'''
while 1:
# Wait for a connection, or go back and wait for a new message (if a connection already exists)
if waitingForConnection:
waitingForConnection = False
conn, addr = self._sock.accept()
chunk = ''
chunks = []
while '\x03' not in chunk: # '\x03' is terminating character for a message
chunk = conn.recv(8192)
if not chunk: # Connection terminated, start the loop again and wait for a new connection
waitingForConnection = True
break
chunks.append(chunk)
message = ''.join(chunks)
# Now do some stuff to parse the message, and update the state if we received a command
if isACommand(message):
_updateState(message)
conn.close()
return
def getState(self):
# This is called from the parent process, so return the object on the queue
state = self._queue.get()
# But put the state back in the queue again so it's there if this method is called again before a state update
self._queue.put(state)
return state
def _updateState(self, message):
# Do some stuff to figure out what to update then update the state
self._state[updatedElementIndex] = updatedValue
# Now empty the queue and put the new state in the queue
while not self._queue.empty():
self._queue.get()
self._queue.put(self._state)
return
顾名思义,multiprocessing
使用不同的进程。在某个时候,调用fork()
,子进程复制父进程的内存,子进程保留自己的内存,不与父进程共享。
不幸的是,您必须使用tools available在进程之间共享内存,导致您提到的代码开销。
您可以查找使用共享内存进行并行处理的其他方法,但请注意 threads/processes/nodes/etc 之间共享内存绝非易事。
您可以随时将存根服务器的状态转储到文件并从单元测试中读取它。这是满足测试需求的非常简单的解决方案。
您需要做的一切:
- 将
filename
作为参数传递给构造函数
- 使用初始值调用
_updateState
- 重写
_updateState
以将状态写入 filename
。最好在 filename
附近创建一个新文件并替换它。如果你担心原子性。
谢谢 Felipe,我的问题主要是 'is there a better way than using queues',正如我在问题中所做的那样。经过更多研究(由于您提到共享内存),我发现共享数组对于这种情况要好得多:
import socket
from multiprocessing import Process, Array
class StubServer(Process):
_port = 4001
_addr = '' # all addresses 0.0.0.0
_sock = None
_state = None
_queue = None
def __init__(self, initState=[93,93,93,93,93,93]):
super(StubServer, self).__init__()
self._state = Array('i', initState) # Is always a 6 element array
def run(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.bind((self._addr, self._port))
self._sock.listen(1)
waitingForConnection = True
'''
Main loop will continue until process is terminated. if a connection is closed, the loop returns
to the start and waits for a new connection. This means multiple tests can be run with the same server
'''
while 1:
# Wait for a connection, or go back and wait for a new message (if a connection already exists)
if waitingForConnection:
waitingForConnection = False
conn, addr = self._sock.accept()
chunk = ''
chunks = []
while '\x03' not in chunk: # '\x03' is terminating character for a message
chunk = conn.recv(8192)
if not chunk: # Connection terminated, start the loop again and wait for a new connection
waitingForConnection = True
break
chunks.append(chunk)
message = ''.join(chunks)
# Now do some stuff to parse the message, and update the state if we received a command
if isACommand(message):
_updateState(message)
conn.close()
return
def getState(self):
# Aquire the lock return the contents of the shared array
with self._state.get_lock():
return self._state[:6] # This is OK because we know it is always a 6 element array
return state
def _updateState(self, message):
# Do some stuff to figure out what to update then..
# Aquire the lock and update the appropriate element in the shared array
with self._state.get_lock():
self._state[updatedElementIndex] = updatedValue
return
这很有用,而且更优雅。感谢您的帮助
我正在创建一个简单的 TCP 服务器作为存根,这样我就可以测试一个运行测试设备的脚本,而不必在那里安装设备。服务器应该坐在那里等待连接,然后维护和更新状态变量(只是 6 个整数的列表)以响应它接收到的命令。然后父进程(例如单元测试 class)应该能够随时查询状态。
服务器的界面应该很简单:
server = StubServer()
server.start()
'''
the client script connects with the server and
some stuff happens to change the state
'''
newState = server.getState() # newState = [93,93,93,3,3,45] for example
server.terminate()
我已经 subclassed Multiprocessing.Process 执行此操作,我可以毫无问题地启动服务器。当我第一次对此进行测试时,在 getState() 方法中我只是返回了实例变量 _state 但我发现这始终只是初始状态。经过一番挖掘后,我找不到任何类似的例子。很多关于 subclassing 过程,但不是这个具体问题。最终,我将下面的代码放在一起,它使用内部 Queue() 来存储状态,但这对我来说看起来很混乱和笨重。有更好的方法吗?
import socket
from multiprocessing import Process, Queue
class StubServer(Process):
_port = 4001
_addr = '' # all addresses 0.0.0.0
_sock = None
_state = []
_queue = None
def __init__(self, initState=[93,93,93,93,93,93]):
super(StubServer, self).__init__()
self._queue = Queue()
self._state = initState
def run(self):
# Put the state into the queue
self._queue.put(self._state)
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.bind((self._addr, self._port))
self._sock.listen(1)
waitingForConnection = True
'''
Main loop will continue until the connection is terminated. if a connection is closed, the loop returns
to the start and waits for a new connection. This means multiple tests can be run with the same server
'''
while 1:
# Wait for a connection, or go back and wait for a new message (if a connection already exists)
if waitingForConnection:
waitingForConnection = False
conn, addr = self._sock.accept()
chunk = ''
chunks = []
while '\x03' not in chunk: # '\x03' is terminating character for a message
chunk = conn.recv(8192)
if not chunk: # Connection terminated, start the loop again and wait for a new connection
waitingForConnection = True
break
chunks.append(chunk)
message = ''.join(chunks)
# Now do some stuff to parse the message, and update the state if we received a command
if isACommand(message):
_updateState(message)
conn.close()
return
def getState(self):
# This is called from the parent process, so return the object on the queue
state = self._queue.get()
# But put the state back in the queue again so it's there if this method is called again before a state update
self._queue.put(state)
return state
def _updateState(self, message):
# Do some stuff to figure out what to update then update the state
self._state[updatedElementIndex] = updatedValue
# Now empty the queue and put the new state in the queue
while not self._queue.empty():
self._queue.get()
self._queue.put(self._state)
return
顾名思义,multiprocessing
使用不同的进程。在某个时候,调用fork()
,子进程复制父进程的内存,子进程保留自己的内存,不与父进程共享。
不幸的是,您必须使用tools available在进程之间共享内存,导致您提到的代码开销。
您可以查找使用共享内存进行并行处理的其他方法,但请注意 threads/processes/nodes/etc 之间共享内存绝非易事。
您可以随时将存根服务器的状态转储到文件并从单元测试中读取它。这是满足测试需求的非常简单的解决方案。
您需要做的一切:
- 将
filename
作为参数传递给构造函数 - 使用初始值调用
_updateState
- 重写
_updateState
以将状态写入filename
。最好在filename
附近创建一个新文件并替换它。如果你担心原子性。
谢谢 Felipe,我的问题主要是 'is there a better way than using queues',正如我在问题中所做的那样。经过更多研究(由于您提到共享内存),我发现共享数组对于这种情况要好得多:
import socket
from multiprocessing import Process, Array
class StubServer(Process):
_port = 4001
_addr = '' # all addresses 0.0.0.0
_sock = None
_state = None
_queue = None
def __init__(self, initState=[93,93,93,93,93,93]):
super(StubServer, self).__init__()
self._state = Array('i', initState) # Is always a 6 element array
def run(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.bind((self._addr, self._port))
self._sock.listen(1)
waitingForConnection = True
'''
Main loop will continue until process is terminated. if a connection is closed, the loop returns
to the start and waits for a new connection. This means multiple tests can be run with the same server
'''
while 1:
# Wait for a connection, or go back and wait for a new message (if a connection already exists)
if waitingForConnection:
waitingForConnection = False
conn, addr = self._sock.accept()
chunk = ''
chunks = []
while '\x03' not in chunk: # '\x03' is terminating character for a message
chunk = conn.recv(8192)
if not chunk: # Connection terminated, start the loop again and wait for a new connection
waitingForConnection = True
break
chunks.append(chunk)
message = ''.join(chunks)
# Now do some stuff to parse the message, and update the state if we received a command
if isACommand(message):
_updateState(message)
conn.close()
return
def getState(self):
# Aquire the lock return the contents of the shared array
with self._state.get_lock():
return self._state[:6] # This is OK because we know it is always a 6 element array
return state
def _updateState(self, message):
# Do some stuff to figure out what to update then..
# Aquire the lock and update the appropriate element in the shared array
with self._state.get_lock():
self._state[updatedElementIndex] = updatedValue
return
这很有用,而且更优雅。感谢您的帮助