如何通过 ZeroMQ 套接字发送 OpenCV 视频片段?

How to send OpenCV video footage over ZeroMQ sockets?

我有一个简单的网络摄像头,我使用 OpenCV 读取了它,现在我正尝试使用 ZeroMQ 将此视频片段发送到不同的 (Python) 程序。所以我有以下简单的脚本来读取网络摄像头并使用 ZeroMQ 套接字发送它:

import cv2
import os
import zmq
import base64

context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')

# init the camera
camera = cv2.VideoCapture(0)

while True:
    try:
        (grabbed, frame) = camera.read()            # grab the current frame
        frame = cv2.resize(frame, (640, 480))       # resize the frame
        footage_socket.send_string(base64.b64encode(frame))

        # Show the video in a window
        cv2.imshow("Frame", frame)                  # show the frame to our screen
        cv2.waitKey(1)                              # Display it at least one ms
        #                                           # before going to the next frame

    except KeyboardInterrupt:
        camera.release()
        cv2.destroyAllWindows()
        print "\n\nBye bye\n"
        break

这很好用,因为它显示了视频并且没有给出任何错误。

我注释掉了显示图像的两行(cv2.imshow()cv2.waitKey(1))。然后我并行启动了下面的脚本。第二个脚本应该接收视频片段并显示它。

import cv2
import zmq
import base64
import numpy as np

context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))

# camera = cv2.VideoCapture("output.avi")

while True:
    try:
        frame = footage_socket.recv_string()
        frame = np.fromstring(base64.b64decode(frame), dtype=np.uint8)
        cv2.imshow("Frame", frame)                  # show the frame to our screen
        cv2.waitKey(1)                              # Display it at least one ms
        #                                           # before going to the next frame
    except KeyboardInterrupt:
        cv2.destroyAllWindows()
        break

print "\n\nBye bye\n"

不幸的是,这在 cv2.waitKey(1) 上冻结了。

有人知道我做错了什么吗?我是否需要以不同方式解码素材?欢迎所有提示!

框架对象包含服务器状态的内存状态,当它被发送到客户端时它会冻结,因为它接收到的框架是一个浅拷贝。 尝试查找如何为框架制作深拷贝。

第 0 步:风险清单

鉴于目标明确,您的rapid-prototyping分布式应用基础架构取决于几个风险点。

0) OpenCV cv2 模块大量使用底层 C-based 组件和 python 美化如果尝试使用 cv2.imshow() 设施和 cv2(外部 FSA)window-management & frame-display 服务,则经常挂起。

1) ZeroMQ 框架可以帮助您的不仅仅是尝试将数据强制转换为 (string) 只是为了使用 .send_string() / .recv_string() - 可能一旦将已知图像 ( pxW * pxH )geometry * RGB-depth 移动到更智能的 BLOB-mapped object 中(将在下面的架构展望中更多地讨论这方面),在这里会变得更好。

2) 鉴于第 0 点和第 1 点,ZeroMQ 基础设施(原型设计中的更多)应该变得健壮以应对未处理的异常(这将留下 zmq.Context() 个实例及其关联的 .Socket() children 在分配的资源上挂起,以防 cv2.imshow() 崩溃,因为它在快速原型循环中经常这样做。

因此 try: except: finally: 处理程序和显式初始 .setsockopt( zmq.LINGER, 0 )[ 中的代码的彻底和 self-disciplined 框架=172=] 套接字实例化后立即 + final .close() + context.term() inside the finally: handler section是必须的。


第 1 步:通过发送 SEQ 普通 int-s

来验证 ZeroMQ 部分

最好的第一级问题隔离是设置流程,只提供不受控制的 SEQ 整数,从 PUB.send( ) 广播边.

...                                                   # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
SEQ += 1
footage_socket.send( SEQ, zmq.NOBLOCK )               # PUB.send( SEQ ) -> *SUB*
...

除非您的接收方证明它可以稳健地处理 int-s 的流程,否则继续进行下去是没有意义的。

...
aMsgIN = footage_socket.recv( zmq.NOBLOCK )           # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
print "{0:}".format( aMsgIN if len( aMsgIN ) > 0 else "." ),
# sleep(...)                                          # backthrottle the loop a bit
...

第 2 步:将 .imshow().recv() 数据和 event-loops

分离

如果您的 data-pumps 按需要工作,远程显示器作为下一个目标开始变得有意义。

ZeroMQ 要么传递完整的消息 ( BLOB ),要么什么都不传递。这是事实。接下来,ZeroMQ 不保证任何人交付 error-free。这些都是事实,您的设计必须接受。

采集是比较简单的部分,只需抓取数据(也许一些色彩空间转换可能会集中在这里,但除此之外,任务是(对于 sub 4K / sub 30fps 图像处理)通常 error-free边.

上面获取的

frame是一个numpy.ndarray实例。发送 hard-coded、binary-mapped BLOB 时将获得最佳性能,没有任何 "smart" 转换,显然,frame 只是一大堆位(尽管有ZeroMQ 的 Zero-copy 机制可以提供一些更高级的美食,但这些在发送方的这个阶段不会直接受益。

#                               struct.unpack() / .pack() seems "just"-enough for fast & smart custom Payload protocol designs
DecodeWireMSG_DATA( aMSG_DATA = struct.unpack( "!" + ( aPayloadHEADER + ( ( pxW * pxH ) * RGB_DEPTH * CELL_SIZE ) ) * "I", aMsgIN ) )

较难的部分在接收端。如果尝试使用隐藏在 .imshow() 中的 cv2 built-in event-loop 引擎,此循环将与您通过 [=33= 读取更新流的外部逻辑发生冲突] 从 PUB 方发布。

作为一种合理的妥协,可以忽略所有 "delayed" frame-s,它们没有使进程与 PUB 端获取/广播节奏同步,并且只显示最近的一张...在 ZeroMQ 传输基础设施上使用 zmq.CONFLATE 选项(延迟 "old" 图片已经失去意义,如果重建的目的flow-of-events 只是一个视觉感知,相反,如果目的是记录完整的获取 1:1,zmq.CONFLATE 将丢弃 frame 个实例,这应该得到处理,因此应该添加一些其他架构来实现这样的 1:1 文档目的,最好与数据流/处理的 "just-visual" 分支分开。

完成此操作后,.recv()(可能是 Poller() + .recv() 循环的组合)将为 SUB 方提供适当的 data-pump,它独立于 cv2 工具,它是 .imshow()(隐藏)-FSA event-loop。


架构和性能提示:

  • 对于更高的 fps + FullHD / 2K / 4K 项目,使用 zmq.Stopwatch() 实例系统地描述 cv2 处理的累积处理延迟/延迟' { .start(), .stop() }方法。

  • 有了硬数据,你可能会及时发现,当出现额外的需求来解决一些甚至 harder-real-time 约束时,所以想想:

  • 主要避免陷入任何无法控制的black-hole的python垃圾回收的风险--始终控制{ gc.disable() | gc.enable();gc.collect() } 围绕您的 critical-path 部分并在您的设计知道可行的地方启动显式 gc.collect()

  • 避免新的内存分配延迟 - 可能 pre-allocate 所有必需的 numpy 数组,稍后仅强制执行 numpy为那些使用数据修改的就地模式s,从而避免任何进一步的 ad-hoc 内存管理关联 wait-states

  • 通过单独 multi-streamed 设计增加的流量 error-immunity,仅对整个大/(彩色)深度图像的部分(条纹)进行独立更新(记住Zero-Warranty - 获取完整的 "fat" 消息或 None )

  • tune-up .Context() 使用 zmq.AFFINITY 的表现将不同的 类 of I/O 流量优先级映射到隔离的 zmq.Context( N ) I/O-threads.

  • fine-tune zmq.SNDBUF + zmq.SNDHWM 在 PUB 端,如果需要 multi-subscribers 并且未使用 zmq.CONFLATE

  • 最后但同样重要的是,可以利用 numba.jit() LLVM-pre-compiled re-useable 代码的加速 critical-path 函数(通常是繁重的 numpy 处理),其中额外的微秒 shaved-off 为您的 video-processing 管道带来最有益的影响,同时仍保持在纯 python (当然,还有一些 cv2 警告)。


在原型制作阶段 cv2 的更多提示和技巧:

可能喜欢 因为 cv2 基于 image-processing。

可能喜欢this用于cv2简单的GUI-interactive参数调优方法。

可能喜欢 cv2 处理管道分析 zmq.Stopwatch() 细节到 [usec]

最后我通过中间步骤解决了问题。我首先将单个图像写入磁盘,然后再次读出这些图像。这让我意识到我需要将帧编码为图像(我选择了 jpg),并且使用魔术方法 cv2.imencode('.jpg', frame)cv2.imdecode(npimg, 1) 我可以让它工作。我在下面粘贴了完整的工作代码。

第一个脚本读取网络摄像头并通过 zeromq 套接字发送镜头:

import cv2
import zmq
import base64

context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')

camera = cv2.VideoCapture(0)  # init the camera

while True:
    try:
        (grabbed, frame) = camera.read()  # grab the current frame
        frame = cv2.resize(frame, (640, 480))  # resize the frame
        encoded, buffer = cv2.imencode('.jpg', frame)
        footage_socket.send_string(base64.b64encode(buffer))
        
    except KeyboardInterrupt:
        camera.release()
        cv2.destroyAllWindows()
        print "\n\nBye bye\n"
        break

第二个脚本接收帧图像并显示它们:

import cv2
import zmq
import base64
import numpy as np

context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))

while True:
    try:
        frame = footage_socket.recv_string()
        img = base64.b64decode(frame)
        npimg = np.fromstring(img, dtype=np.uint8)
        source = cv2.imdecode(npimg, 1)
        cv2.imshow("image", source)
        cv2.waitKey(1)

    except KeyboardInterrupt:
        cv2.destroyAllWindows()
        print "\n\nBye bye\n"
        break