如何将视频流从一个 Python 传递到另一个?
How to pass video stream from one Python to another?
在我之前的 post 中,我们找到了一种将图像文件从一个 Python 传递到另一个的方法:
我正在尝试传一个视频(连续图片):
write.py
import sys
import numpy as np
import cv2
from PIL import Image
import io
import time
while True:
img = cv2.imread('cat.jpg')
bimg = cv2.imencode('.jpg',img)[1]
sys.stdout.buffer.write(bimg)
sys.stdout.flush()
time.sleep(1)
read.py:
import sys
from PIL import Image
import io
import cv2
import numpy as np
from io import BytesIO
while True:
data = sys.stdin.buffer.read()
img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8), cv2.IMREAD_UNCHANGED)
cv2.imshow('image', img_np)
cv2.waitKey(0)
如果我将 write.py 数据输出到终端,它会打印出来。如果我手动将数据交给 read.py 就会被读取。但是将它们放在一起 (python3 write.py | python3 read.py
),它就会挂起。 write.py 只写了一次,而 read.py 似乎从来没有得到它。
我的猜测是,读取代码正在等待写入代码“结束”,然后才将数据包打包并将其称为图像。虽然如果是这样的话,我认为冲洗会解决它。
我想我明白了。在 read.py
中,sys.stdin.buffer.read()
读取并等待 stdin
管道关闭,但 write.py
从未实际关闭其 stdout
,因为 while True
循环。这个概念验证简化示例有效:
write.py
import sys
import time
sys.stdout.buffer.write(b"Hello world")
sys.stdout.buffer.flush()
# Note if we comment out the code bellow it works again
while True:
# Keep this alive but don't have `while True:pass`
# because my computer might crash :D
time.sleep(10)
和read.py
import sys
with open("output.txt", "w") as file:
file.write(sys.stdin.read())
这也会挂起并且永远不会向 "output.txt"
写入任何内容。如果我们从 write.py
中删除 while True
循环,代码将不再挂起并且 "Hello World"
将写入 "output.py"
因为当 write.py
完成写入时它将关闭它的过程将关闭管道。要解决此问题,我建议将 read.py
更改为如下内容:
import sys
while True:
with open("output.txt", "a") as file:
file.write(sys.stdin.read(1))
解决方案:
write.py
import sys
import time
MAX_FILE_SIZE = 16 # bytes
msg = b"Hello world"
# Tell `reader.py` that it needs to read x number of bytes.
length = len(msg)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
#
length = str(length).zfill(MAX_FILE_SIZE)
sys.stdout.buffer.write(length.encode())
sys.stdout.buffer.write(msg)
sys.stdout.buffer.flush()
# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
sys.stdout.buffer.write(b"1")
sys.stdout.buffer.flush()
# Note if we comment out the code bellow it works again
while True:
# Keep this alive but don't have `while True:pass`
# because my computer might crash :D
time.sleep(10)
和read.py
import sys
import time
MAX_FILE_SIZE = 16 # bytes
while True:
time.sleep(1) # Make sure `write.py` has sent the data
# Read `MAX_FILE_SIZE` number of bytes and convert it to an int
# So that we know the size of the file comming in
length = int(sys.stdin.buffer.read(MAX_FILE_SIZE))
time.sleep(1) # Make sure `write.py` has sent the data
# Here you can switch to a different file every time `writer.py`
# Sends a new file
with open("output.txt", "wb") as file:
file.write(sys.stdin.buffer.read(length))
file_ended = sys.stdin.buffer.read(1)
if file_ended == b"1":
# File has ended
break
else:
# We are going to start reading again for the next file:
pass
编辑:
解决方案是这样的:
- 发送文件大小
- 发送实际文件数据
- 发送一个字节,告诉
read.py
是否需要另一个文件
对于第 1 部分,我们只是将文件的长度编码为在前面用 0 填充的字符串。注意:确保 MAX_FILE_SIZE
大于最大文件的大小(大数字会略微降低性能)。对于第 3 部分,如果我们发送 "1"
,则意味着没有更多文件要发送。否则 reader.py
将等待并接受下一个文件。所以 write.py
会变成:
from math import log
import time
import sys
import cv2
MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)
def write_file(buffer, data, last_file=False):
# Tell `reader.py` that it needs to read x number of bytes.
length = len(data)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
#
length = str(length).zfill(MAX_FILE_SIZE)
with open("output.txt", "w") as file:
file.write(length)
buffer.write(length.encode())
# Write the actual data
buffer.write(data)
# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
buffer.write(str(int(last_file)).encode())
buffer.flush()
while True:
img = cv2.imread("img.jpg")
bimg = cv2.imencode(".jpg", img)[1]
# Call write_data
write_file(sys.stdout.buffer, bimg, last_file=False)
# time.sleep(1) # Don't need this
和read.py
将变为:
from io import BytesIO
from math import log
import numpy as np
import time
import cv2
import sys
MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)
def read(buffer, number_of_bytes):
output = b""
while len(output) < number_of_bytes:
output += buffer.read(number_of_bytes - len(output))
assert len(output) == number_of_bytes, "An error occured."
return output
def read_file(buffer):
# Read `MAX_FILE_SIZE` number of bytes and convert it to an int
# So that we know the size of the file comming in
length = int(read(buffer, MAX_FILE_SIZE))
# Here you can switch to a different file every time `writer.py`
# Sends a new file
data = read(buffer, length)
# Read a byte so that we know if it is the last file
file_ended = read(buffer, 1)
return data, (file_ended == b"1")
while True:
print("Reading file")
data, last_file = read_file(sys.stdin.buffer)
img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8),
cv2.IMREAD_UNCHANGED)
cv2.imshow("image", img_np)
cv2.waitKey(0)
if last_file:
break;
两种解决方案:ZeroMQ |磁盘缓存
使用 ZeroMQ 将帧从一个 python 文件发送到另一个文件非常容易。
ZeroMQ
通过 PyPI 安装:pip install -U pyzmq
。有多种方式发送帧。
这是使用 PUBLISHER 和 SUBSCRIBER
的示例
# writer | publisher
import base64
import time
import zmq
import cv2
# Prepare our context and publisher
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5563")
CAM_INDEX_OR_URI = 0
capture = cv2.VideoCapture(CAM_INDEX_OR_URI)
assert capture.isOpened(), "Cannot open camera"
while True:
# Write two messages, each with an envelope and content
# capture frame-by-frame
ret, frame = capture.read()
if not ret:
print("[+] No frame received. Stream ended.")
break
# resize the frame
frame = cv2.resize(frame, (640, 480))
encoded, buffer = cv2.imencode(".jpg", frame)
# all is good
# cv2.imshow("Frames", frame)
# stop with Esc key (27)
if cv2.waitKey(1) == 27:
break
sent_frame = base64.b64encode(buffer)
publisher.send_multipart([b"camera_A", sent_frame])
time.sleep(0.01)
# We never get here but clean up anyhow
publisher.close()
context.term()
capture.release()
cv2.destroyAllWindows()
# reader.py | subscriber
import numpy as np
import base64
import zmq
import cv2
# Prepare our context and publisher
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5563")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "camera_A")
while True:
# Read envelope with address
[address, contents] = subscriber.recv_multipart()
receive_frame = base64.b64decode(contents)
frame = np.frombuffer(receive_frame, dtype=np.uint8)
frame = cv2.imdecode(frame, 1)
cv2.namedWindow("Frames", cv2.WINDOW_NORMAL)
cv2.imshow("Frames", frame)
# stop with Esc key (27)
if cv2.waitKey(1) == 27:
break
subscriber.close()
context.term()
cv2.destroyAllWindows()
磁盘缓存
您也可以考虑使用 diskcache
。它允许通过内存传递 python 个对象。它类似于 Redis,但都 Python 并且不需要服务器。注意:pip install --upgrade diskcache
。您可以调整以开始从相机发送实时帧 |视频
# writer.py
import time
from pathlib import Path
import diskcache as dc
import cv2
tmp = Path("/tmp/stream")
with dc.Cache(tmp) as cache:
print(f"[+] Ready to push data to {tmp}.")
while True:
img = cv2.imread("cat.jpg")
cache.push(img, expire=5)
time.sleep(10)
# reader.py
import time
from pathlib import Path
import diskcache as dc
import cv2
tmp = Path("/tmp/stream")
with dc.Cache(tmp) as cache:
print(f"[+] Ready to pull data from {tmp}")
while True:
(key, value), _ = cache.pull(expire_time=True)
if key:
cv2.imshow("cat", value)
cv2.waitKey(0)
cv2.destroyAllWindows()
time.sleep(0.1)
我会按照这些方向而不是 sys
因为您可以完全控制流数据。参见 diskcache Documentation
您提到要发送的图像大小不一致,但我必须假设它是否来自同一台摄像机(对于给定的视频流)原始图像大小不会改变,而只是压缩后的图像图片大小。我想你可能有足够的 RAM 一次在内存中存储至少一个未压缩的帧,而你只是引入了所有压缩和解压缩的处理开销。
考虑到我会使用 multiprocessing.shared_memory
创建一个共享缓冲区,它可以在两个进程之间共享帧(如果你想要真正的幻想,你甚至可以创建一个包含几帧的循环缓冲区,并防止屏幕撕裂, 但在我的测试中这不是一个大问题)
鉴于 cv2.VideoCapture().read()
可以直接读入现有数组,并且您可以创建一个使用共享内存作为缓冲区的 numpy 数组,您可以将数据读入共享内存,而无需额外复制。使用它,我能够每秒从一个以 1280x688 分辨率使用 H.264 编码的视频文件中读取近 700 帧。
from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np
vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie
#get the first frame to calculate size
cap = cv2.VideoCapture(vid_device)
success, frame = cap.read()
if not success:
raise Exception("error reading from video")
#create a shared memory for sending the frame shape
frame_shape_shm = SharedMemory(name="frame_shape", create=True, size=frame.ndim*4) #4 bytes per dim as long as int32 is big enough
frame_shape = np.ndarray(3, buffer=frame_shape_shm.buf, dtype='i4') #4 bytes per dim as long as int32 is big enough
frame_shape[:] = frame.shape
#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer", create=True, size=frame.nbytes)
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype=frame.dtype)
input("writer is ready: press enter once reader is ready")
try: #use keyboardinterrupt to quit
while True:
cap.read(frame_buffer) #read data into frame buffer
# sleep(1/24) #limit framerate-ish (hitting actual framerate is more complicated than 1 line)
except KeyboardInterrupt:
pass
#cleanup: IMPORTANT, close this one first so the reader doesn't unlink() the
# shm's before this file has exited. (less important on windows)
cap.release()
frame_buffer_shm.close()
frame_shape_shm.close()
reader 过程看起来非常相似,但不是创建视频设备和 read
ing 帧,我们只是构建共享数组,imshow
一堆。 GUI 不如转储数据那么快,所以我们没有达到 700 fps,但高达 500 也不错......
from multiprocessing.shared_memory import SharedMemory
import cv2
import numpy as np
#create a shared memory for reading the frame shape
frame_shape_shm = SharedMemory(name="frame_shape")
frame_shape = np.ndarray([3], buffer=frame_shape_shm.buf, dtype='i4')
#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer")
#create the framebuffer using the shm's memory
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype='u1')
try:
while True:
cv2.imshow('frame', frame_buffer)
cv2.waitKey(1) #this is needed for cv2 to update the gui
except KeyboardInterrupt:
pass
#cleanup: IMPORTANT the writer process should close before this one, so nothing
# tries to access the shm after unlink() is called. (less important on windows)
frame_buffer_shm.close()
frame_buffer_shm.unlink()
frame_shape_shm.close()
frame_shape_shm.unlink()
编辑: 用户的其他问题建议 python 早于 3.8 的版本可能是必需的(甚至跨版本工作),所以这里有一个例子使用 posix_ipc
代替 multiprocessing.shared_memory
创建帧缓冲区(以及如何清理它):
#creation
shm = posix_ipc.SharedMemory(name="frame_buf",
flags=posix_ipc.O_CREX, #if this fails, cleanup didn't happen properly last time
size=frame.nbytes)
shm_map = mmap.mmap(shm.fd, shm.size)
buf = memoryview(shm_map)
#create the frame buffer
frame_buffer = np.ndarray(frame.shape, buffer=buf, dtype=frame.dtype)
frame_buffer[:] = frame[:] #copy first frame into frame buffer
#cleanup
shm.close_fd() #can happen after opening mmap
buf.release() #must happen after frame_buffer is no longer needed and before closing mmap
shm_map.close()
shm.unlink() #must only call from one of the two processes. unlink tells the os to reclaim the space once all handles are closed.
与使用 ROS 发布者和订阅者有关。实施起来很简单,也很容易理解。
在我之前的 post 中,我们找到了一种将图像文件从一个 Python 传递到另一个的方法:
我正在尝试传一个视频(连续图片):
write.py
import sys
import numpy as np
import cv2
from PIL import Image
import io
import time
while True:
img = cv2.imread('cat.jpg')
bimg = cv2.imencode('.jpg',img)[1]
sys.stdout.buffer.write(bimg)
sys.stdout.flush()
time.sleep(1)
read.py:
import sys
from PIL import Image
import io
import cv2
import numpy as np
from io import BytesIO
while True:
data = sys.stdin.buffer.read()
img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8), cv2.IMREAD_UNCHANGED)
cv2.imshow('image', img_np)
cv2.waitKey(0)
如果我将 write.py 数据输出到终端,它会打印出来。如果我手动将数据交给 read.py 就会被读取。但是将它们放在一起 (python3 write.py | python3 read.py
),它就会挂起。 write.py 只写了一次,而 read.py 似乎从来没有得到它。
我的猜测是,读取代码正在等待写入代码“结束”,然后才将数据包打包并将其称为图像。虽然如果是这样的话,我认为冲洗会解决它。
我想我明白了。在 read.py
中,sys.stdin.buffer.read()
读取并等待 stdin
管道关闭,但 write.py
从未实际关闭其 stdout
,因为 while True
循环。这个概念验证简化示例有效:
write.py
import sys
import time
sys.stdout.buffer.write(b"Hello world")
sys.stdout.buffer.flush()
# Note if we comment out the code bellow it works again
while True:
# Keep this alive but don't have `while True:pass`
# because my computer might crash :D
time.sleep(10)
和read.py
import sys
with open("output.txt", "w") as file:
file.write(sys.stdin.read())
这也会挂起并且永远不会向 "output.txt"
写入任何内容。如果我们从 write.py
中删除 while True
循环,代码将不再挂起并且 "Hello World"
将写入 "output.py"
因为当 write.py
完成写入时它将关闭它的过程将关闭管道。要解决此问题,我建议将 read.py
更改为如下内容:
import sys
while True:
with open("output.txt", "a") as file:
file.write(sys.stdin.read(1))
解决方案:
write.py
import sys
import time
MAX_FILE_SIZE = 16 # bytes
msg = b"Hello world"
# Tell `reader.py` that it needs to read x number of bytes.
length = len(msg)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
#
length = str(length).zfill(MAX_FILE_SIZE)
sys.stdout.buffer.write(length.encode())
sys.stdout.buffer.write(msg)
sys.stdout.buffer.flush()
# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
sys.stdout.buffer.write(b"1")
sys.stdout.buffer.flush()
# Note if we comment out the code bellow it works again
while True:
# Keep this alive but don't have `while True:pass`
# because my computer might crash :D
time.sleep(10)
和read.py
import sys
import time
MAX_FILE_SIZE = 16 # bytes
while True:
time.sleep(1) # Make sure `write.py` has sent the data
# Read `MAX_FILE_SIZE` number of bytes and convert it to an int
# So that we know the size of the file comming in
length = int(sys.stdin.buffer.read(MAX_FILE_SIZE))
time.sleep(1) # Make sure `write.py` has sent the data
# Here you can switch to a different file every time `writer.py`
# Sends a new file
with open("output.txt", "wb") as file:
file.write(sys.stdin.buffer.read(length))
file_ended = sys.stdin.buffer.read(1)
if file_ended == b"1":
# File has ended
break
else:
# We are going to start reading again for the next file:
pass
编辑: 解决方案是这样的:
- 发送文件大小
- 发送实际文件数据
- 发送一个字节,告诉
read.py
是否需要另一个文件
对于第 1 部分,我们只是将文件的长度编码为在前面用 0 填充的字符串。注意:确保 MAX_FILE_SIZE
大于最大文件的大小(大数字会略微降低性能)。对于第 3 部分,如果我们发送 "1"
,则意味着没有更多文件要发送。否则 reader.py
将等待并接受下一个文件。所以 write.py
会变成:
from math import log
import time
import sys
import cv2
MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)
def write_file(buffer, data, last_file=False):
# Tell `reader.py` that it needs to read x number of bytes.
length = len(data)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
#
length = str(length).zfill(MAX_FILE_SIZE)
with open("output.txt", "w") as file:
file.write(length)
buffer.write(length.encode())
# Write the actual data
buffer.write(data)
# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
buffer.write(str(int(last_file)).encode())
buffer.flush()
while True:
img = cv2.imread("img.jpg")
bimg = cv2.imencode(".jpg", img)[1]
# Call write_data
write_file(sys.stdout.buffer, bimg, last_file=False)
# time.sleep(1) # Don't need this
和read.py
将变为:
from io import BytesIO
from math import log
import numpy as np
import time
import cv2
import sys
MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)
def read(buffer, number_of_bytes):
output = b""
while len(output) < number_of_bytes:
output += buffer.read(number_of_bytes - len(output))
assert len(output) == number_of_bytes, "An error occured."
return output
def read_file(buffer):
# Read `MAX_FILE_SIZE` number of bytes and convert it to an int
# So that we know the size of the file comming in
length = int(read(buffer, MAX_FILE_SIZE))
# Here you can switch to a different file every time `writer.py`
# Sends a new file
data = read(buffer, length)
# Read a byte so that we know if it is the last file
file_ended = read(buffer, 1)
return data, (file_ended == b"1")
while True:
print("Reading file")
data, last_file = read_file(sys.stdin.buffer)
img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8),
cv2.IMREAD_UNCHANGED)
cv2.imshow("image", img_np)
cv2.waitKey(0)
if last_file:
break;
两种解决方案:ZeroMQ |磁盘缓存
使用 ZeroMQ 将帧从一个 python 文件发送到另一个文件非常容易。
ZeroMQ
通过 PyPI 安装:pip install -U pyzmq
。有多种方式发送帧。
这是使用 PUBLISHER 和 SUBSCRIBER
# writer | publisher
import base64
import time
import zmq
import cv2
# Prepare our context and publisher
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5563")
CAM_INDEX_OR_URI = 0
capture = cv2.VideoCapture(CAM_INDEX_OR_URI)
assert capture.isOpened(), "Cannot open camera"
while True:
# Write two messages, each with an envelope and content
# capture frame-by-frame
ret, frame = capture.read()
if not ret:
print("[+] No frame received. Stream ended.")
break
# resize the frame
frame = cv2.resize(frame, (640, 480))
encoded, buffer = cv2.imencode(".jpg", frame)
# all is good
# cv2.imshow("Frames", frame)
# stop with Esc key (27)
if cv2.waitKey(1) == 27:
break
sent_frame = base64.b64encode(buffer)
publisher.send_multipart([b"camera_A", sent_frame])
time.sleep(0.01)
# We never get here but clean up anyhow
publisher.close()
context.term()
capture.release()
cv2.destroyAllWindows()
# reader.py | subscriber
import numpy as np
import base64
import zmq
import cv2
# Prepare our context and publisher
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5563")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "camera_A")
while True:
# Read envelope with address
[address, contents] = subscriber.recv_multipart()
receive_frame = base64.b64decode(contents)
frame = np.frombuffer(receive_frame, dtype=np.uint8)
frame = cv2.imdecode(frame, 1)
cv2.namedWindow("Frames", cv2.WINDOW_NORMAL)
cv2.imshow("Frames", frame)
# stop with Esc key (27)
if cv2.waitKey(1) == 27:
break
subscriber.close()
context.term()
cv2.destroyAllWindows()
磁盘缓存
您也可以考虑使用 diskcache
。它允许通过内存传递 python 个对象。它类似于 Redis,但都 Python 并且不需要服务器。注意:pip install --upgrade diskcache
。您可以调整以开始从相机发送实时帧 |视频
# writer.py
import time
from pathlib import Path
import diskcache as dc
import cv2
tmp = Path("/tmp/stream")
with dc.Cache(tmp) as cache:
print(f"[+] Ready to push data to {tmp}.")
while True:
img = cv2.imread("cat.jpg")
cache.push(img, expire=5)
time.sleep(10)
# reader.py
import time
from pathlib import Path
import diskcache as dc
import cv2
tmp = Path("/tmp/stream")
with dc.Cache(tmp) as cache:
print(f"[+] Ready to pull data from {tmp}")
while True:
(key, value), _ = cache.pull(expire_time=True)
if key:
cv2.imshow("cat", value)
cv2.waitKey(0)
cv2.destroyAllWindows()
time.sleep(0.1)
我会按照这些方向而不是 sys
因为您可以完全控制流数据。参见 diskcache Documentation
您提到要发送的图像大小不一致,但我必须假设它是否来自同一台摄像机(对于给定的视频流)原始图像大小不会改变,而只是压缩后的图像图片大小。我想你可能有足够的 RAM 一次在内存中存储至少一个未压缩的帧,而你只是引入了所有压缩和解压缩的处理开销。
考虑到我会使用 multiprocessing.shared_memory
创建一个共享缓冲区,它可以在两个进程之间共享帧(如果你想要真正的幻想,你甚至可以创建一个包含几帧的循环缓冲区,并防止屏幕撕裂, 但在我的测试中这不是一个大问题)
鉴于 cv2.VideoCapture().read()
可以直接读入现有数组,并且您可以创建一个使用共享内存作为缓冲区的 numpy 数组,您可以将数据读入共享内存,而无需额外复制。使用它,我能够每秒从一个以 1280x688 分辨率使用 H.264 编码的视频文件中读取近 700 帧。
from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np
vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie
#get the first frame to calculate size
cap = cv2.VideoCapture(vid_device)
success, frame = cap.read()
if not success:
raise Exception("error reading from video")
#create a shared memory for sending the frame shape
frame_shape_shm = SharedMemory(name="frame_shape", create=True, size=frame.ndim*4) #4 bytes per dim as long as int32 is big enough
frame_shape = np.ndarray(3, buffer=frame_shape_shm.buf, dtype='i4') #4 bytes per dim as long as int32 is big enough
frame_shape[:] = frame.shape
#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer", create=True, size=frame.nbytes)
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype=frame.dtype)
input("writer is ready: press enter once reader is ready")
try: #use keyboardinterrupt to quit
while True:
cap.read(frame_buffer) #read data into frame buffer
# sleep(1/24) #limit framerate-ish (hitting actual framerate is more complicated than 1 line)
except KeyboardInterrupt:
pass
#cleanup: IMPORTANT, close this one first so the reader doesn't unlink() the
# shm's before this file has exited. (less important on windows)
cap.release()
frame_buffer_shm.close()
frame_shape_shm.close()
reader 过程看起来非常相似,但不是创建视频设备和 read
ing 帧,我们只是构建共享数组,imshow
一堆。 GUI 不如转储数据那么快,所以我们没有达到 700 fps,但高达 500 也不错......
from multiprocessing.shared_memory import SharedMemory
import cv2
import numpy as np
#create a shared memory for reading the frame shape
frame_shape_shm = SharedMemory(name="frame_shape")
frame_shape = np.ndarray([3], buffer=frame_shape_shm.buf, dtype='i4')
#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer")
#create the framebuffer using the shm's memory
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype='u1')
try:
while True:
cv2.imshow('frame', frame_buffer)
cv2.waitKey(1) #this is needed for cv2 to update the gui
except KeyboardInterrupt:
pass
#cleanup: IMPORTANT the writer process should close before this one, so nothing
# tries to access the shm after unlink() is called. (less important on windows)
frame_buffer_shm.close()
frame_buffer_shm.unlink()
frame_shape_shm.close()
frame_shape_shm.unlink()
编辑: 用户的其他问题建议 python 早于 3.8 的版本可能是必需的(甚至跨版本工作),所以这里有一个例子使用 posix_ipc
代替 multiprocessing.shared_memory
创建帧缓冲区(以及如何清理它):
#creation
shm = posix_ipc.SharedMemory(name="frame_buf",
flags=posix_ipc.O_CREX, #if this fails, cleanup didn't happen properly last time
size=frame.nbytes)
shm_map = mmap.mmap(shm.fd, shm.size)
buf = memoryview(shm_map)
#create the frame buffer
frame_buffer = np.ndarray(frame.shape, buffer=buf, dtype=frame.dtype)
frame_buffer[:] = frame[:] #copy first frame into frame buffer
#cleanup
shm.close_fd() #can happen after opening mmap
buf.release() #must happen after frame_buffer is no longer needed and before closing mmap
shm_map.close()
shm.unlink() #must only call from one of the two processes. unlink tells the os to reclaim the space once all handles are closed.
与使用 ROS 发布者和订阅者有关。实施起来很简单,也很容易理解。