如何将视频流从一个 Python 传递到另一个?

How to pass video stream from one Python to another?

在我之前的 post 中,我们找到了一种将图像文件从一个 Python 传递到另一个的方法:

我正在尝试传一个视频(连续图片):

write.py

import sys
import numpy as np
import cv2
from PIL import Image
import io
import time

while True:
    img = cv2.imread('cat.jpg')
    bimg = cv2.imencode('.jpg',img)[1]
    sys.stdout.buffer.write(bimg)
    sys.stdout.flush()
    time.sleep(1)

read.py:

import sys
from PIL import Image
import io
import cv2
import numpy as np
from io import BytesIO
    
while True:
    data = sys.stdin.buffer.read()
    img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8), cv2.IMREAD_UNCHANGED)
    cv2.imshow('image', img_np)
    cv2.waitKey(0)

如果我将 write.py 数据输出到终端,它会打印出来。如果我手动将数据交给 read.py 就会被读取。但是将它们放在一起 (python3 write.py | python3 read.py),它就会挂起。 write.py 只写了一次,而 read.py 似乎从来没有得到它。

我的猜测是,读取代码正在等待写入代码“结束”,然后才将数据包打包并将其称为图像。虽然如果是这样的话,我认为冲洗会解决它。

我想我明白了。在 read.py 中,sys.stdin.buffer.read() 读取并等待 stdin 管道关闭,但 write.py 从未实际关闭其 stdout,因为 while True 循环。这个概念验证简化示例有效:

write.py

import sys
import time

sys.stdout.buffer.write(b"Hello world")
sys.stdout.buffer.flush()

# Note if we comment out the code bellow it works again
while True:
    # Keep this alive but don't have `while True:pass`
    # because my computer might crash :D
    time.sleep(10)

read.py

import sys

with open("output.txt", "w") as file:
    file.write(sys.stdin.read())

这也会挂起并且永远不会向 "output.txt" 写入任何内容。如果我们从 write.py 中删除 while True 循环,代码将不再挂起并且 "Hello World" 将写入 "output.py" 因为当 write.py 完成写入时它将关闭它的过程将关闭管道。要解决此问题,我建议将 read.py 更改为如下内容:

import sys

while True:
    with open("output.txt", "a") as file:
        file.write(sys.stdin.read(1))

解决方案:

write.py

import sys
import time

MAX_FILE_SIZE = 16 # bytes

msg = b"Hello world"

# Tell `reader.py` that it needs to read x number of bytes.
length = len(msg)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
# 
length = str(length).zfill(MAX_FILE_SIZE)
sys.stdout.buffer.write(length.encode())

sys.stdout.buffer.write(msg)
sys.stdout.buffer.flush()

# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
sys.stdout.buffer.write(b"1")
sys.stdout.buffer.flush()

# Note if we comment out the code bellow it works again
while True:
    # Keep this alive but don't have `while True:pass`
    # because my computer might crash :D
    time.sleep(10)

read.py

import sys
import time

MAX_FILE_SIZE = 16 # bytes

while True:
    time.sleep(1) # Make sure `write.py` has sent the data
    # Read `MAX_FILE_SIZE` number of bytes and convert it to an int
    # So that we know the size of the file comming in
    length = int(sys.stdin.buffer.read(MAX_FILE_SIZE))
    time.sleep(1) # Make sure `write.py` has sent the data

    # Here you can switch to a different file every time `writer.py`
    # Sends a new file
    with open("output.txt", "wb") as file:
        file.write(sys.stdin.buffer.read(length))

    file_ended = sys.stdin.buffer.read(1)
    if file_ended == b"1":
        # File has ended
        break
    else:
        # We are going to start reading again for the next file:
        pass

编辑: 解决方案是这样的:

  1. 发送文件大小
  2. 发送实际文件数据
  3. 发送一个字节,告诉 read.py 是否需要另一个文件

对于第 1 部分,我们只是将文件的长度编码为在前面用 0 填充的字符串。注意:确保 MAX_FILE_SIZE 大于最大文件的大小(大数字会略微降低性能)。对于第 3 部分,如果我们发送 "1",则意味着没有更多文件要发送。否则 reader.py 将等待并接受下一个文件。所以 write.py 会变成:

from math import log
import time
import sys
import cv2


MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)


def write_file(buffer, data, last_file=False):
   # Tell `reader.py` that it needs to read x number of bytes.
   length = len(data)
   # We also need to tell `read.py` how many bytes it needs to read.
   # This means that we have reached the same problem as before.
   # To fix that issue we are always going to send the number of bytes but
   # We are going to pad it with `0`s at the start.
   # 
   length = str(length).zfill(MAX_FILE_SIZE)
   with open("output.txt", "w") as file:
      file.write(length)
   buffer.write(length.encode())

   # Write the actual data
   buffer.write(data)

   # We also need to tell `read.py` that it was the last file that we send
   # Sending `1` means that the file has ended
   buffer.write(str(int(last_file)).encode())
   buffer.flush()


while True:
    img = cv2.imread("img.jpg")
    bimg = cv2.imencode(".jpg", img)[1]
    # Call write_data
    write_file(sys.stdout.buffer, bimg, last_file=False)
    # time.sleep(1) # Don't need this

read.py将变为:

from io import BytesIO
from math import log
import numpy as np
import time
import cv2
import sys


MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)


def read(buffer, number_of_bytes):
    output = b""
    while len(output) < number_of_bytes:
        output += buffer.read(number_of_bytes - len(output))
    assert len(output) == number_of_bytes, "An error occured."
    return output


def read_file(buffer):
    # Read `MAX_FILE_SIZE` number of bytes and convert it to an int
    # So that we know the size of the file comming in
    length = int(read(buffer, MAX_FILE_SIZE))

    # Here you can switch to a different file every time `writer.py`
    # Sends a new file
    data = read(buffer, length)

    # Read a byte so that we know if it is the last file
    file_ended = read(buffer, 1)

    return data, (file_ended == b"1")


while True:
    print("Reading file")
    data, last_file = read_file(sys.stdin.buffer)
    img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8),
                          cv2.IMREAD_UNCHANGED)
    cv2.imshow("image", img_np)
    cv2.waitKey(0)

    if last_file:
        break;

两种解决方案:ZeroMQ |磁盘缓存

使用 ZeroMQ 将帧从一个 python 文件发送到另一个文件非常容易。

ZeroMQ


通过 PyPI 安装:pip install -U pyzmq。有多种方式发送帧。 这是使用 PUBLISHER 和 SUBSCRIBER

的示例
# writer | publisher
import base64
import time
import zmq
import cv2


# Prepare our context and publisher
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5563")

CAM_INDEX_OR_URI = 0
capture = cv2.VideoCapture(CAM_INDEX_OR_URI)
assert capture.isOpened(), "Cannot open camera"

while True:
    # Write two messages, each with an envelope and content

    # capture frame-by-frame
    ret, frame = capture.read()
    if not ret:
        print("[+] No frame received. Stream ended.")
        break

    # resize the frame
    frame = cv2.resize(frame, (640, 480))
    encoded, buffer = cv2.imencode(".jpg", frame)

    #  all is good
    # cv2.imshow("Frames", frame)

    # stop with Esc key (27)
    if cv2.waitKey(1) == 27:
        break

    sent_frame = base64.b64encode(buffer)
    publisher.send_multipart([b"camera_A", sent_frame])

    time.sleep(0.01)
  

# We never get here but clean up anyhow
publisher.close()
context.term()

capture.release()
cv2.destroyAllWindows()
# reader.py | subscriber

import numpy as np
import base64
import zmq
import cv2

# Prepare our context and publisher
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5563")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "camera_A")

while True:
    # Read envelope with address
    [address, contents] = subscriber.recv_multipart()

    receive_frame = base64.b64decode(contents)
    frame = np.frombuffer(receive_frame, dtype=np.uint8)
    frame = cv2.imdecode(frame, 1)

 
    cv2.namedWindow("Frames", cv2.WINDOW_NORMAL)
    cv2.imshow("Frames", frame)
   

    # stop with Esc key (27) 
    if cv2.waitKey(1) == 27:
        break

subscriber.close()
context.term()
cv2.destroyAllWindows()

磁盘缓存


您也可以考虑使用 diskcache。它允许通过内存传递 python 个对象。它类似于 Redis,但都 Python 并且不需要服务器。注意:pip install --upgrade diskcache。您可以调整以开始从相机发送实时帧 |视频

# writer.py
import time
from pathlib import Path
import diskcache as dc
import cv2

tmp = Path("/tmp/stream")

with dc.Cache(tmp) as cache:
    print(f"[+] Ready to push data to {tmp}.")
    while True:
        img = cv2.imread("cat.jpg")
        cache.push(img, expire=5)
        time.sleep(10)


# reader.py

import time
from pathlib import Path
import diskcache as dc
import cv2

tmp = Path("/tmp/stream")

with dc.Cache(tmp) as cache:
    print(f"[+] Ready to pull data from {tmp}")
    while True:
        (key, value), _ = cache.pull(expire_time=True)
        if key:
            cv2.imshow("cat", value)
            cv2.waitKey(0)
            cv2.destroyAllWindows()
        time.sleep(0.1)

我会按照这些方向而不是 sys 因为您可以完全控制流数据。参见 diskcache Documentation

您提到要发送的图像大小不一致,但我必须假设它是否来自同一台摄像机(对于给定的视频流)原始图像大小不会改变,而只是压缩后的图像图片大小。我想你可能有足够的 RAM 一次在内存中存储至少一个未压缩的帧,而你只是引入了所有压缩和解压缩的处理开销。

考虑到我会使用 multiprocessing.shared_memory 创建一个共享缓冲区,它可以在两个进程之间共享帧(如果你想要真正的幻想,你甚至可以创建一个包含几帧的循环缓冲区,并防止屏幕撕裂, 但在我的测试中这不是一个大问题)

鉴于 cv2.VideoCapture().read() 可以直接读入现有数组,并且您可以创建一个使用共享内存作为缓冲区的 numpy 数组,您可以将数据读入共享内存,而无需额外复制。使用它,我能够每秒从一个以 1280x688 分辨率使用 H.264 编码的视频文件中读取近 700 帧。

from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np

vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie

#get the first frame to calculate size
cap = cv2.VideoCapture(vid_device)
success, frame = cap.read()
if not success:
    raise Exception("error reading from video")

#create a shared memory for sending the frame shape
frame_shape_shm = SharedMemory(name="frame_shape", create=True, size=frame.ndim*4) #4 bytes per dim as long as int32 is big enough
frame_shape = np.ndarray(3, buffer=frame_shape_shm.buf, dtype='i4')  #4 bytes per dim as long as int32 is big enough
frame_shape[:] = frame.shape

#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer", create=True, size=frame.nbytes)
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype=frame.dtype)

input("writer is ready: press enter once reader is ready")

try: #use keyboardinterrupt to quit
    while True:
        cap.read(frame_buffer) #read data into frame buffer
        # sleep(1/24) #limit framerate-ish (hitting actual framerate is more complicated than 1 line)
except KeyboardInterrupt:
    pass

#cleanup: IMPORTANT, close this one first so the reader doesn't unlink() the 
#  shm's before this file has exited. (less important on windows)
cap.release()
frame_buffer_shm.close()
frame_shape_shm.close()

reader 过程看起来非常相似,但不是创建视频设备和 reading 帧,我们只是构建共享数组,imshow 一堆。 GUI 不如转储数据那么快,所以我们没有达到 700 fps,但高达 500 也不错......

from multiprocessing.shared_memory import SharedMemory
import cv2
import numpy as np

#create a shared memory for reading the frame shape
frame_shape_shm = SharedMemory(name="frame_shape")
frame_shape = np.ndarray([3], buffer=frame_shape_shm.buf, dtype='i4')

#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer")

#create the framebuffer using the shm's memory
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype='u1')
try:
    while True:
        cv2.imshow('frame', frame_buffer)
        cv2.waitKey(1) #this is needed for cv2 to update the gui
except KeyboardInterrupt:
    pass

#cleanup: IMPORTANT the writer process should close before this one, so nothing 
#  tries to access the shm after unlink() is called. (less important on windows)
frame_buffer_shm.close()
frame_buffer_shm.unlink()
frame_shape_shm.close()
frame_shape_shm.unlink()

编辑: 用户的其他问题建议 python 早于 3.8 的版本可能是必需的(甚至跨版本工作),所以这里有一个例子使用 posix_ipc 代替 multiprocessing.shared_memory 创建帧缓冲区(以及如何清理它):

#creation
shm = posix_ipc.SharedMemory(name="frame_buf", 
                             flags=posix_ipc.O_CREX, #if this fails, cleanup didn't happen properly last time
                             size=frame.nbytes)
shm_map = mmap.mmap(shm.fd, shm.size)
buf = memoryview(shm_map)
#create the frame buffer
frame_buffer = np.ndarray(frame.shape, buffer=buf, dtype=frame.dtype)
frame_buffer[:] = frame[:] #copy first frame into frame buffer

#cleanup
shm.close_fd() #can happen after opening mmap
buf.release() #must happen after frame_buffer is no longer needed and before closing mmap
shm_map.close()
shm.unlink() #must only call from one of the two processes. unlink tells the os to reclaim the space once all handles are closed.

与使用 ROS 发布者和订阅者有关。实施起来很简单,也很容易理解。