如何通过 ZeroMQ 套接字发送 OpenCV 视频片段?
How to send OpenCV video footage over ZeroMQ sockets?
我有一个简单的网络摄像头,我使用 OpenCV 读取了它,现在我正尝试使用 ZeroMQ 将此视频片段发送到不同的 (Python) 程序。所以我有以下简单的脚本来读取网络摄像头并使用 ZeroMQ 套接字发送它:
import cv2
import os
import zmq
import base64
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')
# init the camera
camera = cv2.VideoCapture(0)
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
footage_socket.send_string(base64.b64encode(frame))
# Show the video in a window
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms
# # before going to the next frame
except KeyboardInterrupt:
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
这很好用,因为它显示了视频并且没有给出任何错误。
我注释掉了显示图像的两行(cv2.imshow()
和 cv2.waitKey(1)
)。然后我并行启动了下面的脚本。第二个脚本应该接收视频片段并显示它。
import cv2
import zmq
import base64
import numpy as np
context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))
# camera = cv2.VideoCapture("output.avi")
while True:
try:
frame = footage_socket.recv_string()
frame = np.fromstring(base64.b64decode(frame), dtype=np.uint8)
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms
# # before going to the next frame
except KeyboardInterrupt:
cv2.destroyAllWindows()
break
print "\n\nBye bye\n"
不幸的是,这在 cv2.waitKey(1)
上冻结了。
有人知道我做错了什么吗?我是否需要以不同方式解码素材?欢迎所有提示!
框架对象包含服务器状态的内存状态,当它被发送到客户端时它会冻结,因为它接收到的框架是一个浅拷贝。
尝试查找如何为框架制作深拷贝。
第 0 步:风险清单
鉴于目标明确,您的rapid-prototyping分布式应用基础架构取决于几个风险点。
0) OpenCV cv2
模块大量使用底层 C-based 组件和 python 美化如果尝试使用 cv2.imshow()
设施和 cv2
(外部 FSA)window-management & frame-display 服务,则经常挂起。
1) ZeroMQ 框架可以帮助您的不仅仅是尝试将数据强制转换为 (string)
只是为了使用 .send_string() / .recv_string()
- 可能一旦将已知图像 ( pxW * pxH )
geometry * RGB-depth 移动到更智能的 BLOB-mapped object 中(将在下面的架构展望中更多地讨论这方面),在这里会变得更好。
2) 鉴于第 0 点和第 1 点,ZeroMQ 基础设施(原型设计中的更多)应该变得健壮以应对未处理的异常(这将留下 zmq.Context()
个实例及其关联的 .Socket()
children 在分配的资源上挂起,以防 cv2.imshow()
崩溃,因为它在快速原型循环中经常这样做。
因此 try: except: finally:
处理程序和显式初始 .setsockopt( zmq.LINGER, 0 )
[ 中的代码的彻底和 self-disciplined 框架=172=] 套接字实例化后立即 + final .close()
+ context.term()
inside the finally:
handler section是必须的。
第 1 步:通过发送 SEQ
普通 int
-s
来验证 ZeroMQ 部分
最好的第一级问题隔离是设置流程,只提供不受控制的 SEQ
整数,从 PUB
.send( )
广播边.
... # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
SEQ += 1
footage_socket.send( SEQ, zmq.NOBLOCK ) # PUB.send( SEQ ) -> *SUB*
...
除非您的接收方证明它可以稳健地处理 int-s 的流程,否则继续进行下去是没有意义的。
...
aMsgIN = footage_socket.recv( zmq.NOBLOCK ) # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
print "{0:}".format( aMsgIN if len( aMsgIN ) > 0 else "." ),
# sleep(...) # backthrottle the loop a bit
...
第 2 步:将 .imshow()
与 .recv()
数据和 event-loops
分离
如果您的 data-pumps 按需要工作,远程显示器作为下一个目标开始变得有意义。
ZeroMQ 要么传递完整的消息 ( BLOB ),要么什么都不传递。这是事实。接下来,ZeroMQ 不保证任何人交付 error-free。这些都是事实,您的设计必须接受。
采集是比较简单的部分,只需抓取数据(也许一些色彩空间转换可能会集中在这里,但除此之外,任务是(对于 sub 4K / sub 30fps 图像处理)通常 error-free边.
上面获取的frame
是一个numpy.ndarray
实例。发送 hard-coded、binary-mapped BLOB 时将获得最佳性能,没有任何 "smart" 转换,显然,frame
只是一大堆位(尽管有ZeroMQ 的 Zero-copy 机制可以提供一些更高级的美食,但这些在发送方的这个阶段不会直接受益。
# struct.unpack() / .pack() seems "just"-enough for fast & smart custom Payload protocol designs
DecodeWireMSG_DATA( aMSG_DATA = struct.unpack( "!" + ( aPayloadHEADER + ( ( pxW * pxH ) * RGB_DEPTH * CELL_SIZE ) ) * "I", aMsgIN ) )
较难的部分在接收端。如果尝试使用隐藏在 .imshow()
中的 cv2
built-in event-loop 引擎,此循环将与您通过 [=33= 读取更新流的外部逻辑发生冲突] 从 PUB
方发布。
作为一种合理的妥协,可以忽略所有 "delayed" frame
-s,它们没有使进程与 PUB
端获取/广播节奏同步,并且只显示最近的一张...在 ZeroMQ 传输基础设施上使用 zmq.CONFLATE
选项(延迟 "old" 图片已经失去意义,如果重建的目的flow-of-events 只是一个视觉感知,相反,如果目的是记录完整的获取 1:1,zmq.CONFLATE
将丢弃 frame
个实例,这应该得到处理,因此应该添加一些其他架构来实现这样的 1:1 文档目的,最好与数据流/处理的 "just-visual" 分支分开。
完成此操作后,.recv()
(可能是 Poller()
+ .recv()
循环的组合)将为 SUB
方提供适当的 data-pump,它独立于 cv2
工具,它是 .imshow()
(隐藏)-FSA event-loop。
架构和性能提示:
对于更高的 fps + FullHD / 2K / 4K 项目,使用 zmq.Stopwatch() 实例系统地描述 cv2
处理的累积处理延迟/延迟' { .start(), .stop() }
方法。
有了硬数据,你可能会及时发现,当出现额外的需求来解决一些甚至 harder-real-time 约束时,所以想想:
主要避免陷入任何无法控制的black-hole的python垃圾回收的风险--始终控制{ gc.disable() | gc.enable();gc.collect() }
围绕您的 critical-path 部分并在您的设计知道可行的地方启动显式 gc.collect()
。
避免新的内存分配延迟 - 可能 pre-allocate 所有必需的 numpy
数组,稍后仅强制执行 numpy
为那些使用数据修改的就地模式s,从而避免任何进一步的 ad-hoc 内存管理关联 wait-states
通过单独 multi-streamed 设计增加的流量 error-immunity,仅对整个大/(彩色)深度图像的部分(条纹)进行独立更新(记住Zero-Warranty - 获取完整的 "fat" 消息或 None
)
tune-up .Context()
使用 zmq.AFFINITY
的表现将不同的 类 of I/O 流量优先级映射到隔离的 zmq.Context( N )
I/O-threads.
fine-tune zmq.SNDBUF
+ zmq.SNDHWM
在 PUB 端,如果需要 multi-subscribers 并且未使用 zmq.CONFLATE
。
最后但同样重要的是,可以利用 numba.jit()
LLVM-pre-compiled re-useable 代码的加速 critical-path 函数(通常是繁重的 numpy
处理),其中额外的微秒 shaved-off 为您的 video-processing 管道带来最有益的影响,同时仍保持在纯 python (当然,还有一些 cv2
警告)。
在原型制作阶段 cv2
的更多提示和技巧:
可能喜欢 因为 cv2
基于 image-processing。
可能喜欢this用于cv2
简单的GUI-interactive参数调优方法。
可能喜欢 cv2
处理管道分析 zmq.Stopwatch()
细节到 [usec]
最后我通过中间步骤解决了问题。我首先将单个图像写入磁盘,然后再次读出这些图像。这让我意识到我需要将帧编码为图像(我选择了 jpg),并且使用魔术方法 cv2.imencode('.jpg', frame)
和 cv2.imdecode(npimg, 1)
我可以让它工作。我在下面粘贴了完整的工作代码。
第一个脚本读取网络摄像头并通过 zeromq 套接字发送镜头:
import cv2
import zmq
import base64
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')
camera = cv2.VideoCapture(0) # init the camera
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
encoded, buffer = cv2.imencode('.jpg', frame)
footage_socket.send_string(base64.b64encode(buffer))
except KeyboardInterrupt:
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
第二个脚本接收帧图像并显示它们:
import cv2
import zmq
import base64
import numpy as np
context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))
while True:
try:
frame = footage_socket.recv_string()
img = base64.b64decode(frame)
npimg = np.fromstring(img, dtype=np.uint8)
source = cv2.imdecode(npimg, 1)
cv2.imshow("image", source)
cv2.waitKey(1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
我有一个简单的网络摄像头,我使用 OpenCV 读取了它,现在我正尝试使用 ZeroMQ 将此视频片段发送到不同的 (Python) 程序。所以我有以下简单的脚本来读取网络摄像头并使用 ZeroMQ 套接字发送它:
import cv2
import os
import zmq
import base64
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')
# init the camera
camera = cv2.VideoCapture(0)
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
footage_socket.send_string(base64.b64encode(frame))
# Show the video in a window
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms
# # before going to the next frame
except KeyboardInterrupt:
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
这很好用,因为它显示了视频并且没有给出任何错误。
我注释掉了显示图像的两行(cv2.imshow()
和 cv2.waitKey(1)
)。然后我并行启动了下面的脚本。第二个脚本应该接收视频片段并显示它。
import cv2
import zmq
import base64
import numpy as np
context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))
# camera = cv2.VideoCapture("output.avi")
while True:
try:
frame = footage_socket.recv_string()
frame = np.fromstring(base64.b64decode(frame), dtype=np.uint8)
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms
# # before going to the next frame
except KeyboardInterrupt:
cv2.destroyAllWindows()
break
print "\n\nBye bye\n"
不幸的是,这在 cv2.waitKey(1)
上冻结了。
有人知道我做错了什么吗?我是否需要以不同方式解码素材?欢迎所有提示!
框架对象包含服务器状态的内存状态,当它被发送到客户端时它会冻结,因为它接收到的框架是一个浅拷贝。 尝试查找如何为框架制作深拷贝。
第 0 步:风险清单
鉴于目标明确,您的rapid-prototyping分布式应用基础架构取决于几个风险点。
0) OpenCV cv2
模块大量使用底层 C-based 组件和 python 美化如果尝试使用 cv2.imshow()
设施和 cv2
(外部 FSA)window-management & frame-display 服务,则经常挂起。
1) ZeroMQ 框架可以帮助您的不仅仅是尝试将数据强制转换为 (string)
只是为了使用 .send_string() / .recv_string()
- 可能一旦将已知图像 ( pxW * pxH )
geometry * RGB-depth 移动到更智能的 BLOB-mapped object 中(将在下面的架构展望中更多地讨论这方面),在这里会变得更好。
2) 鉴于第 0 点和第 1 点,ZeroMQ 基础设施(原型设计中的更多)应该变得健壮以应对未处理的异常(这将留下 zmq.Context()
个实例及其关联的 .Socket()
children 在分配的资源上挂起,以防 cv2.imshow()
崩溃,因为它在快速原型循环中经常这样做。
因此 try: except: finally:
处理程序和显式初始 .setsockopt( zmq.LINGER, 0 )
[ 中的代码的彻底和 self-disciplined 框架=172=] 套接字实例化后立即 + final .close()
+ context.term()
inside the finally:
handler section是必须的。
第 1 步:通过发送 SEQ
普通 int
-s
来验证 ZeroMQ 部分
最好的第一级问题隔离是设置流程,只提供不受控制的 SEQ
整数,从 PUB
.send( )
广播边.
... # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
SEQ += 1
footage_socket.send( SEQ, zmq.NOBLOCK ) # PUB.send( SEQ ) -> *SUB*
...
除非您的接收方证明它可以稳健地处理 int-s 的流程,否则继续进行下去是没有意义的。
...
aMsgIN = footage_socket.recv( zmq.NOBLOCK ) # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
print "{0:}".format( aMsgIN if len( aMsgIN ) > 0 else "." ),
# sleep(...) # backthrottle the loop a bit
...
第 2 步:将 .imshow()
与 .recv()
数据和 event-loops
分离
如果您的 data-pumps 按需要工作,远程显示器作为下一个目标开始变得有意义。
ZeroMQ 要么传递完整的消息 ( BLOB ),要么什么都不传递。这是事实。接下来,ZeroMQ 不保证任何人交付 error-free。这些都是事实,您的设计必须接受。
采集是比较简单的部分,只需抓取数据(也许一些色彩空间转换可能会集中在这里,但除此之外,任务是(对于 sub 4K / sub 30fps 图像处理)通常 error-free边.
上面获取的frame
是一个numpy.ndarray
实例。发送 hard-coded、binary-mapped BLOB 时将获得最佳性能,没有任何 "smart" 转换,显然,frame
只是一大堆位(尽管有ZeroMQ 的 Zero-copy 机制可以提供一些更高级的美食,但这些在发送方的这个阶段不会直接受益。
# struct.unpack() / .pack() seems "just"-enough for fast & smart custom Payload protocol designs
DecodeWireMSG_DATA( aMSG_DATA = struct.unpack( "!" + ( aPayloadHEADER + ( ( pxW * pxH ) * RGB_DEPTH * CELL_SIZE ) ) * "I", aMsgIN ) )
较难的部分在接收端。如果尝试使用隐藏在 .imshow()
中的 cv2
built-in event-loop 引擎,此循环将与您通过 [=33= 读取更新流的外部逻辑发生冲突] 从 PUB
方发布。
作为一种合理的妥协,可以忽略所有 "delayed" frame
-s,它们没有使进程与 PUB
端获取/广播节奏同步,并且只显示最近的一张...在 ZeroMQ 传输基础设施上使用 zmq.CONFLATE
选项(延迟 "old" 图片已经失去意义,如果重建的目的flow-of-events 只是一个视觉感知,相反,如果目的是记录完整的获取 1:1,zmq.CONFLATE
将丢弃 frame
个实例,这应该得到处理,因此应该添加一些其他架构来实现这样的 1:1 文档目的,最好与数据流/处理的 "just-visual" 分支分开。
完成此操作后,.recv()
(可能是 Poller()
+ .recv()
循环的组合)将为 SUB
方提供适当的 data-pump,它独立于 cv2
工具,它是 .imshow()
(隐藏)-FSA event-loop。
架构和性能提示:
对于更高的 fps + FullHD / 2K / 4K 项目,使用 zmq.Stopwatch() 实例系统地描述
cv2
处理的累积处理延迟/延迟'{ .start(), .stop() }
方法。有了硬数据,你可能会及时发现,当出现额外的需求来解决一些甚至 harder-real-time 约束时,所以想想:
主要避免陷入任何无法控制的black-hole的python垃圾回收的风险--始终控制
{ gc.disable() | gc.enable();gc.collect() }
围绕您的 critical-path 部分并在您的设计知道可行的地方启动显式gc.collect()
。避免新的内存分配延迟 - 可能 pre-allocate 所有必需的
numpy
数组,稍后仅强制执行numpy
为那些使用数据修改的就地模式s,从而避免任何进一步的 ad-hoc 内存管理关联 wait-states通过单独 multi-streamed 设计增加的流量 error-immunity,仅对整个大/(彩色)深度图像的部分(条纹)进行独立更新(记住Zero-Warranty - 获取完整的 "fat" 消息或
None
)tune-up
.Context()
使用zmq.AFFINITY
的表现将不同的 类 of I/O 流量优先级映射到隔离的zmq.Context( N )
I/O-threads.fine-tune
zmq.SNDBUF
+zmq.SNDHWM
在 PUB 端,如果需要 multi-subscribers 并且未使用zmq.CONFLATE
。最后但同样重要的是,可以利用
numba.jit()
LLVM-pre-compiled re-useable 代码的加速 critical-path 函数(通常是繁重的numpy
处理),其中额外的微秒 shaved-off 为您的 video-processing 管道带来最有益的影响,同时仍保持在纯 python (当然,还有一些cv2
警告)。
在原型制作阶段 cv2
的更多提示和技巧:
可能喜欢 cv2
基于 image-processing。
可能喜欢this用于cv2
简单的GUI-interactive参数调优方法。
可能喜欢 cv2
处理管道分析 zmq.Stopwatch()
细节到 [usec]
最后我通过中间步骤解决了问题。我首先将单个图像写入磁盘,然后再次读出这些图像。这让我意识到我需要将帧编码为图像(我选择了 jpg),并且使用魔术方法 cv2.imencode('.jpg', frame)
和 cv2.imdecode(npimg, 1)
我可以让它工作。我在下面粘贴了完整的工作代码。
第一个脚本读取网络摄像头并通过 zeromq 套接字发送镜头:
import cv2
import zmq
import base64
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')
camera = cv2.VideoCapture(0) # init the camera
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
encoded, buffer = cv2.imencode('.jpg', frame)
footage_socket.send_string(base64.b64encode(buffer))
except KeyboardInterrupt:
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
第二个脚本接收帧图像并显示它们:
import cv2
import zmq
import base64
import numpy as np
context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))
while True:
try:
frame = footage_socket.recv_string()
img = base64.b64decode(frame)
npimg = np.fromstring(img, dtype=np.uint8)
source = cv2.imdecode(npimg, 1)
cv2.imshow("image", source)
cv2.waitKey(1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break