Python 多进程共享内存;一次写入,多次读取
Python multiprocessing shared memory; one write, multiple read
系统
- Linux(Manjaro KDE)
- Python 3.8.3
程序:
我在 UDP 端口上有传入的字符串数据。主循环在使用选择器监视 UDP 端口之前将进程假脱机。我想要 UDP 数据,它不断更新,可用于每个进程。
尝试过:
- maxsize = 1 的多处理队列令人头疼并很快崩溃。
- 多处理阵列(这是我现在所在的位置)
我已经检查过,我正在查看的每个位置的数组都具有相同的内存地址(我认为)。无论出于何种原因,当我尝试访问子进程中数组的内容时,进程挂起。
没试过
- 管道。我有一种感觉,这可能是要走的路。但我已经深入未知领域;我以前从未使用过它们。
我想要什么
我想从子进程访问 UDP 数据 - 这些是 camera_view 方法。
虚拟 UDP 字符串
import socket
import random
import datetime
import time
conn = ('127.0.0.1', 6666)
def rand_value(f_val, t_val):
result = round(random.uniform(f_val, t_val), 2)
result = random.uniform(f_val, t_val)
return result
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
time.sleep(6)
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
overlay = timestamp
for i in range(9):
val = rand_value(i*10, i*10+10)
if i == 8: val = 'TASK: Im the real Batman'
overlay = overlay + "," + str(val)
print(overlay)
sock.sendto(overlay.encode(), conn)
我的程序
import datetime
import selectors
import socket
import time
from multiprocessing import Lock, Process, Queue
from multiprocessing.sharedctypes import Array
from ctypes import c_char_p
REQUIRED_CAMERAS = 1
CAMERA_CONN = {'name':['Colour Camera'], 'ip':['127.0.0.1'], 'port':[9000]}
OVERLAY_CONN = ('0.0.0.0', 6666)
CONTROL_CONN = ('0.0.0.0', 6667)
NUMBER_OF_ITEMS_IN_OVERLAY = 10
class Camera():
def __init__(self, cam_name, cam_ip, cam_port):
self.ip = cam_ip
self.port = cam_port
self.capture = cv2.VideoCapture(0)
self.frame_width = int(self.capture.get(3))
self.frame_height = int(self.capture.get(4))
self.name = cam_name
def get_overlay(data_packet):
data = data_packet.decode()
data = data.split(',')
field0 = data[0]
field1 = 'KP: ' + str(round(float(data[1]), 3))
field2 = 'DCC: ' + str(round(float(data[2]), 2)) + 'm'
field3 = 'E: ' + str(round(float(data[3]), 2)) + 'm'
field4 = 'N: ' + str(round(float(data[4]), 2)) + 'm'
field5 = 'D: ' + str(round(float(data[5]), 2)) + 'm'
field6 = 'H: ' + str(round(float(data[6]), 2)) # + '°'
field7 = 'R: ' + str(round(float(data[7]), 2)) # + '°'
field8 = 'P: ' + str(round(float(data[8]), 2)) # + '°'
field9 = data[9]
x = []
for i in range(NUMBER_OF_ITEMS_IN_OVERLAY):
x.append(eval('field' + str(i)).encode())
# if i == 0:
# print(x[i])
return x
def socket_reader(sock, mask, q, REQUIRED_CAMERAS, overlay):
data_packet, sensor_ip = sock.recvfrom(1024)
sensor_port = sock.getsockname()[1]
print(f'SENSOR PORT {sensor_port} and SENSOR_IP {sensor_ip}')
if sensor_port == OVERLAY_CONN[1]:
x = get_overlay(data_packet)
for i in range(len(x)):
overlay[i] = x[i]
print(f'Socket Reader {overlay}')
def camera_view(CAMERA_CONN, cam_name, camera, overlay_q, control_q, overlay):
while True:
print(f'PROCESS {camera} RUNNING FOR: {cam_name}')
try:
print(f'Camera View {overlay}')
for i in range(len(overlay)):
print(overlay[i])
except:
pass
time.sleep(1)
def controller(REQUIRED_CAMERAS, CAMERA_CONN, OVERLAY_CONN, CONTROL_CONN):
if REQUIRED_CAMERAS > len(CAMERA_CONN['name']):
print(f'REQURIED_CAMERAS: {REQUIRED_CAMERAS} - more than connections in CAMERA_CONN ')
else:
# Set up a UDP connection for the overlay string and the control commands
sock_overlay = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_control = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_overlay.bind(OVERLAY_CONN)
sock_control.bind(CONTROL_CONN)
# Set up the selector to watch over the socket
# and trigger when data is ready for reading
sel = selectors.DefaultSelector()
sel.register(fileobj=sock_overlay, events=selectors.EVENT_READ, data=socket_reader)
sel.register(fileobj=sock_control, events=selectors.EVENT_READ, data=socket_reader)
# create shared memory
overlay_q = Queue(maxsize=1)
control_q = Queue(maxsize=1)
overlay = Array(c_char_p, range(NUMBER_OF_ITEMS_IN_OVERLAY))
print(f'Init Overlay {overlay}')
# Generate the processes; one per camera
processes = []
for camera in range(REQUIRED_CAMERAS):
processes.append(Process(target=camera_view, args=(CAMERA_CONN, CAMERA_CONN['name'][camera], camera, overlay_q, control_q, overlay)))
for process in processes:
process.daemon = True
process.start()
# Spin over the selector
while True:
# Only have one connnection registered, so to stop
# the loop spinning up the CPU, I have made it blocking
# with the timeout = 1 (sec) instead of =0.
events = sel.select(timeout=None)
for key, mask in events:
# the selector callback is the data= from the register above
callback = key.data
# the callback gets the sock, mask and the sensor queues
if key.fileobj == sock_overlay:
callback(key.fileobj, mask, overlay_q, REQUIRED_CAMERAS, overlay)
else:
callback(key.fileobj, mask, control_q, REQUIRED_CAMERAS, overlay)
if __name__ == "__main__":
controller(REQUIRED_CAMERAS, CAMERA_CONN, OVERLAY_CONN, CONTROL_CONN)
编辑 1:
from multiprocessing import Process, Array
from ctypes import c_char_p
import time
def worker(arr):
count = 0
while True:
count += 1
val = 'val' + str(count)
arr[0] = val
print(arr[:])
time.sleep(2)
def main():
arr = Array(c_char_p, 1)
p = Process(target=worker, args=(arr,))
p.daemon = True
p.start()
while True:
print(arr[:])
try:
print(arr[:].decode('utf-8'))
except :
pass
# try:
# val = arr[:]
# val = val.decode('utf-8')
# print(f'main {val}')
# except:
# pass
time.sleep(1)
if __name__ == "__main__":
main()
'''
from multiprocessing import Process, Array
from ctypes import c_char_p
import time
def worker(arr):
count = 0
while True:
count += 1
val = 'val' + str(count)
arr[0] = bytes(val, 'utf-8')
print(arr[:])
time.sleep(2)
def main():
arr = Array(c_char_p, 1)
p = Process(target=worker, args=(arr,))
p.daemon = True
p.start()
while True:
print(arr[:])
try:
print(arr[:].decode('utf-8'))
except :
pass
time.sleep(1)
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
'''
编辑2:
感谢@RolandSmith,我坚持使用队列,我想我已经有了一个关于如何前进的模板。请参见下面的代码。如果我不能让它在程序中工作,我会回到这里。
from multiprocessing import Process, Queue
import time
import datetime
def worker(camera, q):
val = ''
while True:
if q.full() == True:
val = q.get()
else:
val = val
print(f'WORKER{camera} {val}')
time.sleep(0.2)
def main():
cameras = 2
processes = []
queues = []
for camera in range(cameras):
queues.append(Queue(maxsize=1))
processes.append(Process(target=worker, args=(camera, queues[camera])))
for process in processes:
process.daemon = True
process.start()
while True:
for q in queues:
if not q.empty():
try:
_ = q.get()
except:
pass
else:
q.put(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
time.sleep(.5)
if __name__ == "__main__":
main()
在我看来,使用 Queue
比使用 Array
少 error-prone 的解决方案。
这是您的第二个示例,已转换为使用 Queue
:
from multiprocessing import Process, Queue
import time
def worker(q):
count = 0
while True:
count += 1
val = 'val' + str(count)
q.put(val)
print('worker:', val)
time.sleep(2)
def main():
q = Queue()
p = Process(target=worker, args=(q, ))
p.daemon = True
p.start()
while True:
if not q.empty():
print('main:', q.get())
time.sleep(1)
if __name__ == "__main__":
main()
这产生:
> python3 test3.py
worker: val1
main: val1
worker: val2
main: val2
worker: val3
main: val3
worker: val4
main: val4
worker: val5
这是使用 Pipe
:
的相同示例
from multiprocessing import Process, Pipe
import time
def worker(p):
count = 0
while True:
count += 1
val = 'val' + str(count)
p.send(val)
print('worker:', val)
time.sleep(2)
def main():
child, parent = Pipe()
p = Process(target=worker, args=(child, ))
p.daemon = True
p.start()
while True:
if parent.poll():
print('main:', parent.recv())
time.sleep(1)
if __name__ == "__main__":
main()
这会产生与上一个示例相同的结果。
此外,默认情况下管道是双向的。
因此,您还可以将工作人员的数据发送回 parent.
系统
- Linux(Manjaro KDE)
- Python 3.8.3
程序:
我在 UDP 端口上有传入的字符串数据。主循环在使用选择器监视 UDP 端口之前将进程假脱机。我想要 UDP 数据,它不断更新,可用于每个进程。
尝试过:
- maxsize = 1 的多处理队列令人头疼并很快崩溃。
- 多处理阵列(这是我现在所在的位置)
我已经检查过,我正在查看的每个位置的数组都具有相同的内存地址(我认为)。无论出于何种原因,当我尝试访问子进程中数组的内容时,进程挂起。
没试过
- 管道。我有一种感觉,这可能是要走的路。但我已经深入未知领域;我以前从未使用过它们。
我想要什么
我想从子进程访问 UDP 数据 - 这些是 camera_view 方法。
虚拟 UDP 字符串
import socket
import random
import datetime
import time
conn = ('127.0.0.1', 6666)
def rand_value(f_val, t_val):
result = round(random.uniform(f_val, t_val), 2)
result = random.uniform(f_val, t_val)
return result
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
time.sleep(6)
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
overlay = timestamp
for i in range(9):
val = rand_value(i*10, i*10+10)
if i == 8: val = 'TASK: Im the real Batman'
overlay = overlay + "," + str(val)
print(overlay)
sock.sendto(overlay.encode(), conn)
我的程序
import datetime
import selectors
import socket
import time
from multiprocessing import Lock, Process, Queue
from multiprocessing.sharedctypes import Array
from ctypes import c_char_p
REQUIRED_CAMERAS = 1
CAMERA_CONN = {'name':['Colour Camera'], 'ip':['127.0.0.1'], 'port':[9000]}
OVERLAY_CONN = ('0.0.0.0', 6666)
CONTROL_CONN = ('0.0.0.0', 6667)
NUMBER_OF_ITEMS_IN_OVERLAY = 10
class Camera():
def __init__(self, cam_name, cam_ip, cam_port):
self.ip = cam_ip
self.port = cam_port
self.capture = cv2.VideoCapture(0)
self.frame_width = int(self.capture.get(3))
self.frame_height = int(self.capture.get(4))
self.name = cam_name
def get_overlay(data_packet):
data = data_packet.decode()
data = data.split(',')
field0 = data[0]
field1 = 'KP: ' + str(round(float(data[1]), 3))
field2 = 'DCC: ' + str(round(float(data[2]), 2)) + 'm'
field3 = 'E: ' + str(round(float(data[3]), 2)) + 'm'
field4 = 'N: ' + str(round(float(data[4]), 2)) + 'm'
field5 = 'D: ' + str(round(float(data[5]), 2)) + 'm'
field6 = 'H: ' + str(round(float(data[6]), 2)) # + '°'
field7 = 'R: ' + str(round(float(data[7]), 2)) # + '°'
field8 = 'P: ' + str(round(float(data[8]), 2)) # + '°'
field9 = data[9]
x = []
for i in range(NUMBER_OF_ITEMS_IN_OVERLAY):
x.append(eval('field' + str(i)).encode())
# if i == 0:
# print(x[i])
return x
def socket_reader(sock, mask, q, REQUIRED_CAMERAS, overlay):
data_packet, sensor_ip = sock.recvfrom(1024)
sensor_port = sock.getsockname()[1]
print(f'SENSOR PORT {sensor_port} and SENSOR_IP {sensor_ip}')
if sensor_port == OVERLAY_CONN[1]:
x = get_overlay(data_packet)
for i in range(len(x)):
overlay[i] = x[i]
print(f'Socket Reader {overlay}')
def camera_view(CAMERA_CONN, cam_name, camera, overlay_q, control_q, overlay):
while True:
print(f'PROCESS {camera} RUNNING FOR: {cam_name}')
try:
print(f'Camera View {overlay}')
for i in range(len(overlay)):
print(overlay[i])
except:
pass
time.sleep(1)
def controller(REQUIRED_CAMERAS, CAMERA_CONN, OVERLAY_CONN, CONTROL_CONN):
if REQUIRED_CAMERAS > len(CAMERA_CONN['name']):
print(f'REQURIED_CAMERAS: {REQUIRED_CAMERAS} - more than connections in CAMERA_CONN ')
else:
# Set up a UDP connection for the overlay string and the control commands
sock_overlay = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_control = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_overlay.bind(OVERLAY_CONN)
sock_control.bind(CONTROL_CONN)
# Set up the selector to watch over the socket
# and trigger when data is ready for reading
sel = selectors.DefaultSelector()
sel.register(fileobj=sock_overlay, events=selectors.EVENT_READ, data=socket_reader)
sel.register(fileobj=sock_control, events=selectors.EVENT_READ, data=socket_reader)
# create shared memory
overlay_q = Queue(maxsize=1)
control_q = Queue(maxsize=1)
overlay = Array(c_char_p, range(NUMBER_OF_ITEMS_IN_OVERLAY))
print(f'Init Overlay {overlay}')
# Generate the processes; one per camera
processes = []
for camera in range(REQUIRED_CAMERAS):
processes.append(Process(target=camera_view, args=(CAMERA_CONN, CAMERA_CONN['name'][camera], camera, overlay_q, control_q, overlay)))
for process in processes:
process.daemon = True
process.start()
# Spin over the selector
while True:
# Only have one connnection registered, so to stop
# the loop spinning up the CPU, I have made it blocking
# with the timeout = 1 (sec) instead of =0.
events = sel.select(timeout=None)
for key, mask in events:
# the selector callback is the data= from the register above
callback = key.data
# the callback gets the sock, mask and the sensor queues
if key.fileobj == sock_overlay:
callback(key.fileobj, mask, overlay_q, REQUIRED_CAMERAS, overlay)
else:
callback(key.fileobj, mask, control_q, REQUIRED_CAMERAS, overlay)
if __name__ == "__main__":
controller(REQUIRED_CAMERAS, CAMERA_CONN, OVERLAY_CONN, CONTROL_CONN)
编辑 1:
from multiprocessing import Process, Array
from ctypes import c_char_p
import time
def worker(arr):
count = 0
while True:
count += 1
val = 'val' + str(count)
arr[0] = val
print(arr[:])
time.sleep(2)
def main():
arr = Array(c_char_p, 1)
p = Process(target=worker, args=(arr,))
p.daemon = True
p.start()
while True:
print(arr[:])
try:
print(arr[:].decode('utf-8'))
except :
pass
# try:
# val = arr[:]
# val = val.decode('utf-8')
# print(f'main {val}')
# except:
# pass
time.sleep(1)
if __name__ == "__main__":
main()
'''
from multiprocessing import Process, Array
from ctypes import c_char_p
import time
def worker(arr):
count = 0
while True:
count += 1
val = 'val' + str(count)
arr[0] = bytes(val, 'utf-8')
print(arr[:])
time.sleep(2)
def main():
arr = Array(c_char_p, 1)
p = Process(target=worker, args=(arr,))
p.daemon = True
p.start()
while True:
print(arr[:])
try:
print(arr[:].decode('utf-8'))
except :
pass
time.sleep(1)
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
'''
编辑2:
感谢@RolandSmith,我坚持使用队列,我想我已经有了一个关于如何前进的模板。请参见下面的代码。如果我不能让它在程序中工作,我会回到这里。
from multiprocessing import Process, Queue
import time
import datetime
def worker(camera, q):
val = ''
while True:
if q.full() == True:
val = q.get()
else:
val = val
print(f'WORKER{camera} {val}')
time.sleep(0.2)
def main():
cameras = 2
processes = []
queues = []
for camera in range(cameras):
queues.append(Queue(maxsize=1))
processes.append(Process(target=worker, args=(camera, queues[camera])))
for process in processes:
process.daemon = True
process.start()
while True:
for q in queues:
if not q.empty():
try:
_ = q.get()
except:
pass
else:
q.put(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
time.sleep(.5)
if __name__ == "__main__":
main()
在我看来,使用 Queue
比使用 Array
少 error-prone 的解决方案。
这是您的第二个示例,已转换为使用 Queue
:
from multiprocessing import Process, Queue
import time
def worker(q):
count = 0
while True:
count += 1
val = 'val' + str(count)
q.put(val)
print('worker:', val)
time.sleep(2)
def main():
q = Queue()
p = Process(target=worker, args=(q, ))
p.daemon = True
p.start()
while True:
if not q.empty():
print('main:', q.get())
time.sleep(1)
if __name__ == "__main__":
main()
这产生:
> python3 test3.py
worker: val1
main: val1
worker: val2
main: val2
worker: val3
main: val3
worker: val4
main: val4
worker: val5
这是使用 Pipe
:
from multiprocessing import Process, Pipe
import time
def worker(p):
count = 0
while True:
count += 1
val = 'val' + str(count)
p.send(val)
print('worker:', val)
time.sleep(2)
def main():
child, parent = Pipe()
p = Process(target=worker, args=(child, ))
p.daemon = True
p.start()
while True:
if parent.poll():
print('main:', parent.recv())
time.sleep(1)
if __name__ == "__main__":
main()
这会产生与上一个示例相同的结果。
此外,默认情况下管道是双向的。 因此,您还可以将工作人员的数据发送回 parent.