Python multiprocessing - 进程间的管道通信
Python multiprocessing - pipe communication between processes
我正在做一个项目,从客户传感器收集数据,处理收集的数据并将其发送回客户。可能有多个客户端同时要求从我们的服务器接收一些数据,所以我不得不实现多处理。我不能使用线程,因为某些变量必须与客户端无关。如果我这样做了,我的代码可能会变得难以阅读和升级,我不希望这样。所以我决定使用Processes,但是现在有一些数据需要在父进程和子进程之间进行剪切。经过一番研究,我发现管道通信可以满足我的要求。
以下代码成功将数据从父进程发送到子进程,子进程更新数据并将其发送回父进程。但它之所以起作用,只是因为 sleep() 函数阻止了父进程与子进程同时使用管道。
如何更改它以使其具有相同的功能,但没有 sleep() 函数,我认为它在未来很可能会导致问题?
from multiprocessing import Process, Pipe
import time
def update_data(pipe):
p_out, p_in = pipe
L = []
while True:
message = p_out.recv()
if message=='FINISHED':
break
L.append(message)
L.append(['new data']) #updating received data
writer(L, p_in) #sending received data to parent Process
p_in.close()
def writer(i, p_in):
p_in.send(i)
p_in.send('FINISHED')
L = ['0' for i in range(10)] #current data
if __name__=='__main__':
p_out, p_in = Pipe()
update_data_process = Process(target=update_data, args=((p_out, p_in),))
update_data_process.start()
writer(L, p_in) #sending current data to child Process
time.sleep(3) #needs to be changed
while True:
message = p_out.recv()
if message != 'FINISHED':
L = message
else:
break
print(L)
p_in.close()
update_data_process.join()
您遇到问题是因为您将连接视为 单工,但默认情况下 Pipe() returns 双工(双向)连接。
这意味着当您调用 parent_conn, child_conn = Pipe()
时,您会得到一个连接,仅 父级应该用于 reads and 写和另一个这样的子连接对象。父 和 子只对 他们的 连接对象进行操作。
from multiprocessing import Process, Pipe
from datetime import datetime
SENTINEL = 'SENTINEL'
def update_data(child_conn):
result = []
for msg in iter(child_conn.recv, SENTINEL):
print(f'{datetime.now()} child received {msg}')
result.append(msg)
print(f'{datetime.now()} child received sentinel')
result.append(['new data'])
writer(child_conn, result)
child_conn.close()
def writer(conn, data):
conn.send(data)
conn.send(SENTINEL)
if __name__=='__main__':
parent_conn, child_conn = Pipe() # default is duplex!
update_data_process = Process(target=update_data, args=(child_conn,))
update_data_process.start()
data = ['0' for i in range(3)]
writer(parent_conn, data)
for msg in iter(parent_conn.recv, SENTINEL):
print(f'{datetime.now()} parent received {msg}')
print(f'{datetime.now()} parent received sentinel')
parent_conn.close()
update_data_process.join()
示例输出:
2019-03-12 00:09:52.920375 child received ['0', '0', '0']
2019-03-12 00:09:52.920512 child received sentinel
2019-03-12 00:09:52.920702 parent received [['0', '0', '0'], ['new data']]
2019-03-12 00:09:52.920764 parent received sentinel
Process finished with exit code 0
如果您不熟悉 iter(callable, sentinel)
的用法,请查看 。
我正在做一个项目,从客户传感器收集数据,处理收集的数据并将其发送回客户。可能有多个客户端同时要求从我们的服务器接收一些数据,所以我不得不实现多处理。我不能使用线程,因为某些变量必须与客户端无关。如果我这样做了,我的代码可能会变得难以阅读和升级,我不希望这样。所以我决定使用Processes,但是现在有一些数据需要在父进程和子进程之间进行剪切。经过一番研究,我发现管道通信可以满足我的要求。
以下代码成功将数据从父进程发送到子进程,子进程更新数据并将其发送回父进程。但它之所以起作用,只是因为 sleep() 函数阻止了父进程与子进程同时使用管道。
如何更改它以使其具有相同的功能,但没有 sleep() 函数,我认为它在未来很可能会导致问题?
from multiprocessing import Process, Pipe
import time
def update_data(pipe):
p_out, p_in = pipe
L = []
while True:
message = p_out.recv()
if message=='FINISHED':
break
L.append(message)
L.append(['new data']) #updating received data
writer(L, p_in) #sending received data to parent Process
p_in.close()
def writer(i, p_in):
p_in.send(i)
p_in.send('FINISHED')
L = ['0' for i in range(10)] #current data
if __name__=='__main__':
p_out, p_in = Pipe()
update_data_process = Process(target=update_data, args=((p_out, p_in),))
update_data_process.start()
writer(L, p_in) #sending current data to child Process
time.sleep(3) #needs to be changed
while True:
message = p_out.recv()
if message != 'FINISHED':
L = message
else:
break
print(L)
p_in.close()
update_data_process.join()
您遇到问题是因为您将连接视为 单工,但默认情况下 Pipe() returns 双工(双向)连接。
这意味着当您调用 parent_conn, child_conn = Pipe()
时,您会得到一个连接,仅 父级应该用于 reads and 写和另一个这样的子连接对象。父 和 子只对 他们的 连接对象进行操作。
from multiprocessing import Process, Pipe
from datetime import datetime
SENTINEL = 'SENTINEL'
def update_data(child_conn):
result = []
for msg in iter(child_conn.recv, SENTINEL):
print(f'{datetime.now()} child received {msg}')
result.append(msg)
print(f'{datetime.now()} child received sentinel')
result.append(['new data'])
writer(child_conn, result)
child_conn.close()
def writer(conn, data):
conn.send(data)
conn.send(SENTINEL)
if __name__=='__main__':
parent_conn, child_conn = Pipe() # default is duplex!
update_data_process = Process(target=update_data, args=(child_conn,))
update_data_process.start()
data = ['0' for i in range(3)]
writer(parent_conn, data)
for msg in iter(parent_conn.recv, SENTINEL):
print(f'{datetime.now()} parent received {msg}')
print(f'{datetime.now()} parent received sentinel')
parent_conn.close()
update_data_process.join()
示例输出:
2019-03-12 00:09:52.920375 child received ['0', '0', '0']
2019-03-12 00:09:52.920512 child received sentinel
2019-03-12 00:09:52.920702 parent received [['0', '0', '0'], ['new data']]
2019-03-12 00:09:52.920764 parent received sentinel
Process finished with exit code 0
如果您不熟悉 iter(callable, sentinel)
的用法,请查看