ZeroMQ 没有收到客户端程序发送的值
ZeroMQ not receiving a value sent by client program
我创建了两个程序来使用 ZeroMQ 发送和接收视频源。但是,接收程序总是卡在 .recv()
方法上。
我为这个程序使用了两个ZeroMQ库:一个是原生的zmq
,另一个是派生的imagezmq
。 imagezmq
用于发送和接收来自视频的帧数据,而本机 zmq
库用于发送和接收图像发送时的时间。
imagezmq
部分工作正常。
程序只卡在 zmq
部分。
以下是我的两个程序:
FinalCam.py
import struct
import time
import imutils
import imagezmq
import cv2
import zmq
import socket
import pickle
# # image sending
sender = imagezmq.ImageSender(connect_to='tcp://localhost:5555')
hostName = socket.gethostname() # send RPi hostname with each image
vid_dir = "/root/redis-opencv-videostream/vtest.avi"
cap = cv2.VideoCapture(vid_dir) # init the camera
context = zmq.Context() # setup for sending time
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:6666")
while True: # send images as stream until Ctrl-C
ret, frame = cap.read()
frame = imutils.resize(frame, width=400) # resize without compressionq
captureTime = time.time()
sender.send_image(hostName, frame)
print (captureTime)
captureTime = struct.pack('d', captureTime)
#msg = pickle.dumps(captureTime)
print("message primed")
socket.send(captureTime)
print("time sent")
生成此输出:
1591824603.5772414
message primed
time sent
FinalRecieve.py
import cv2
import imagezmq
import time
import zmq
import struct
FRAMES = 5
image_hub = imagezmq.ImageHub() # image socket
context = zmq.Context() # time socket
socket = context.socket(zmq.SUB)
socket.bind("tcp://*:6666")
while True: # show streamed images until Ctrl-C
loopTime = time.time()
for i in range (0, FRAMES):
hostName, frame = image_hub.recv_image()
image_hub.send_reply(b'OK')
print("recieved image, waiting for time")
captureTime = socket.recv()
print("meow")
print(captureTime)
finishTime = time.time()
fpsTime = finishTime - loopTime
fps = FRAMES / fpsTime
print(fps)
生成此输出:
received image, waiting for time
这里有几件事可以尝试让原生 zmq
部分正常工作:
对SUB
-套接字使用.connect()
-方法:
socket.connect("tcp://localhost:6666")
和 .bind()
-方法用于您的 PUB
-套接字:
socket.bind("tcp://*:6666")
已解释 here in the guide 应该使用 connect 从套接字创建传出连接。
在 .bind()
的同级文档中解释说它用于接受连接。
也尝试设置套接字选项:socket.setsockopt(zmq.SUBSCRIBE, "")
据描述 here in the guide SUB
-sockets 最初会过滤掉所有消息,因此这可以解释为什么您没有收到任何消息。上面的示例提供了一个空过滤器,它接受所有传入消息。
重要的是要注意,对于基于 PUB
和 SUB
的分发,由于其连接时间或网络条件,SUB
可能不一定会收到消息。例如。在订阅者连接之前从发布者发送的所有内容都是不可接收的
问题定义:
"The imagezmq
part works fine. The program only gets stuck on the zmq
part. "
解决方案?
ZeroMQ 非常智能(自 v2.0+ 起),完全不需要任何人严格决定是否 { .bind() | .connect() }
并且一个 AccessPoint 可以自由地 .bind()
一些 .connect()
其他 TransportClass
-1:N
通信拓扑的通道(换句话说,“反向”.bind()/.connect()
是 总是 可能)。
ImageZMQ
-衍生模块并非如此。在您的控制域后面有一组硬连线选择(以及任何硬核劫持 Class-内部属性 { ImageHub.zmq_socket | ImageSender.zmq_socket }
几乎不是 ImageZMQ 作者 ( architecture-(co)-authors ) ) 的梦想。
鉴于 imagezmq
已发布的内部机制本身是预先决定的硬编码选择(相同的 Class 实例,取决于模式,有时 .bind()
和 .connect()
否则) 被宣布为有效,让我们专注于使用它(已发布)的最大值。
基于上述内容,编写您的工作部分,以便将时间发送到工作方案“内部”:
PAYLOAD_MASK = "{0:}|||{1:}"
...
while ...
sender.send_image( PAYLOAD_MASK.format( time.time(), hostName ),
frame
)
并且可以在.recv()
端轻松解码,无需修复使用过的struct.pack()
问题(在distributed-computing,在 ZeroMQ 互连的世界中,人们永远不知道远程平台是什么,字节顺序( Endian-convention )将被假定为“那里”,所以struct.pack()
应始终在 format
字符串中显式声明 Endian 类型。始终。如果不确定,可以自行设计- 检测消息,这可能会挽救这种天真的使用并测试/调整你的 format
-string 用于 .unpack()
-method 的本地使用对于任何一种情况任何隐式盲目 .pack()-sender ... 超出了此 post 的范围,但所有 RPi / 非 RPi 平台项目都倾向于此(仅)-假设-“相同”-Endian 警告)
惊喜:
如果使用 pyzmq
版本 18..,它未在 ImageZMQ 的兼容性列表中列出并且用例注定会崩溃或导致隐患
所以应该从 zmq.pyzmq_version()
检查开始(配置管理中针对受控环境的专业策略,不是吗?)并捕获并解决任何非-匹配案例。
为什么 zmq
部分不起作用?
很难说。没有完整的代码,人们可能只是猜测。 zmq.LINGER
未明确设置为零时,我的候选人会出现错误/丢失终止,这会导致挂断 Context()
- 阻止实际资源直到重新启动的实例。正确使用 ZeroMQ 工具反映了这些防御性编程方法(LINGER
、CONFLATE
、MAXMSGSIZE
、IMMEDIATE
、白名单和许多其他防御性 .setsockopt()
和 Context()
-参数化选项 ),因为 distributed-computing 很复杂,可能会在您的控制域或视线之外的许多地方发生故障。
因此,如果您的架构系统要变得健壮和自我修复,请毫不犹豫地成为一名防御性编程设计师。
aLocalCONTEXT = zmq.Context( nIOthreads )
aSUBsocket = aLocalCONTEXT.socket( zmq.SUB )
try:
aSUBsocket.bind( "tcp://*:6666" )
except:
...
aSUBsocket.setsockopt( zmq.LINGER, 0 )
aSUBsocket.setsockopt( zmq.SUBSCRIBE, b"" )
aSUBsocket.setsockopt( zmq.MAXMSGSIZE, ...)
...
最好重新阅读有关调整其他 ISO-OSI-L2/L3 相关参数以获得最佳性能和最安全 distributed-computing 策略的本机 ZeroMQ API 文档。值得花时间为每个新的 API 更新这样做,自 v2.1+...
以来一直如此
这个的反例?
看看 .send_reply()
method 如何不受其自身 ImageHub
[ 的非 REQ-REP
实例的崩溃保护=116=] Class.
为什么?
到目前为止,对 .send_reply()
方法的非保护调用将在对任何 ImageHub
-class 实例进行任何此类调用时使应用程序崩溃,该实例是在其中初始化的-默认REQ_REP = False
(即在PUB/SUB
模式的SUB
端可用)。
aNonDefaultImageHUB_instance.send( "this will crash" )
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/ms/anaconda2/lib/python2.7/site-packages/zmq/sugar/socket.py", line 395, in send
return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
File "zmq/backend/cython/socket.pyx", line 725, in zmq.backend.cython.socket.Socket.send
File "zmq/backend/cython/socket.pyx", line 772, in zmq.backend.cython.socket.Socket.send
File "zmq/backend/cython/socket.pyx", line 247, in zmq.backend.cython.socket._send_copy
File "zmq/backend/cython/socket.pyx", line 242, in zmq.backend.cython.socket._send_copy
File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Operation not supported
我创建了两个程序来使用 ZeroMQ 发送和接收视频源。但是,接收程序总是卡在 .recv()
方法上。
我为这个程序使用了两个ZeroMQ库:一个是原生的zmq
,另一个是派生的imagezmq
。 imagezmq
用于发送和接收来自视频的帧数据,而本机 zmq
库用于发送和接收图像发送时的时间。
imagezmq
部分工作正常。
程序只卡在 zmq
部分。
以下是我的两个程序:
FinalCam.py
import struct
import time
import imutils
import imagezmq
import cv2
import zmq
import socket
import pickle
# # image sending
sender = imagezmq.ImageSender(connect_to='tcp://localhost:5555')
hostName = socket.gethostname() # send RPi hostname with each image
vid_dir = "/root/redis-opencv-videostream/vtest.avi"
cap = cv2.VideoCapture(vid_dir) # init the camera
context = zmq.Context() # setup for sending time
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:6666")
while True: # send images as stream until Ctrl-C
ret, frame = cap.read()
frame = imutils.resize(frame, width=400) # resize without compressionq
captureTime = time.time()
sender.send_image(hostName, frame)
print (captureTime)
captureTime = struct.pack('d', captureTime)
#msg = pickle.dumps(captureTime)
print("message primed")
socket.send(captureTime)
print("time sent")
生成此输出:
1591824603.5772414
message primed
time sent
FinalRecieve.py
import cv2
import imagezmq
import time
import zmq
import struct
FRAMES = 5
image_hub = imagezmq.ImageHub() # image socket
context = zmq.Context() # time socket
socket = context.socket(zmq.SUB)
socket.bind("tcp://*:6666")
while True: # show streamed images until Ctrl-C
loopTime = time.time()
for i in range (0, FRAMES):
hostName, frame = image_hub.recv_image()
image_hub.send_reply(b'OK')
print("recieved image, waiting for time")
captureTime = socket.recv()
print("meow")
print(captureTime)
finishTime = time.time()
fpsTime = finishTime - loopTime
fps = FRAMES / fpsTime
print(fps)
生成此输出:
received image, waiting for time
这里有几件事可以尝试让原生 zmq
部分正常工作:
对SUB
-套接字使用.connect()
-方法:
socket.connect("tcp://localhost:6666")
和 .bind()
-方法用于您的 PUB
-套接字:
socket.bind("tcp://*:6666")
已解释 here in the guide 应该使用 connect 从套接字创建传出连接。
在 .bind()
的同级文档中解释说它用于接受连接。
也尝试设置套接字选项:socket.setsockopt(zmq.SUBSCRIBE, "")
据描述 here in the guide SUB
-sockets 最初会过滤掉所有消息,因此这可以解释为什么您没有收到任何消息。上面的示例提供了一个空过滤器,它接受所有传入消息。
重要的是要注意,对于基于 PUB
和 SUB
的分发,由于其连接时间或网络条件,SUB
可能不一定会收到消息。例如。在订阅者连接之前从发布者发送的所有内容都是不可接收的
问题定义:
"The
imagezmq
part works fine. The program only gets stuck on thezmq
part. "
解决方案?
ZeroMQ 非常智能(自 v2.0+ 起),完全不需要任何人严格决定是否 { .bind() | .connect() }
并且一个 AccessPoint 可以自由地 .bind()
一些 .connect()
其他 TransportClass
-1:N
通信拓扑的通道(换句话说,“反向”.bind()/.connect()
是 总是 可能)。
ImageZMQ
-衍生模块并非如此。在您的控制域后面有一组硬连线选择(以及任何硬核劫持 Class-内部属性 { ImageHub.zmq_socket | ImageSender.zmq_socket }
几乎不是 ImageZMQ 作者 ( architecture-(co)-authors ) ) 的梦想。
鉴于 imagezmq
已发布的内部机制本身是预先决定的硬编码选择(相同的 Class 实例,取决于模式,有时 .bind()
和 .connect()
否则) 被宣布为有效,让我们专注于使用它(已发布)的最大值。
基于上述内容,编写您的工作部分,以便将时间发送到工作方案“内部”:
PAYLOAD_MASK = "{0:}|||{1:}"
...
while ...
sender.send_image( PAYLOAD_MASK.format( time.time(), hostName ),
frame
)
并且可以在.recv()
端轻松解码,无需修复使用过的struct.pack()
问题(在distributed-computing,在 ZeroMQ 互连的世界中,人们永远不知道远程平台是什么,字节顺序( Endian-convention )将被假定为“那里”,所以struct.pack()
应始终在 format
字符串中显式声明 Endian 类型。始终。如果不确定,可以自行设计- 检测消息,这可能会挽救这种天真的使用并测试/调整你的 format
-string 用于 .unpack()
-method 的本地使用对于任何一种情况任何隐式盲目 .pack()-sender ... 超出了此 post 的范围,但所有 RPi / 非 RPi 平台项目都倾向于此(仅)-假设-“相同”-Endian 警告)
惊喜:
如果使用 pyzmq
版本 18..,它未在 ImageZMQ 的兼容性列表中列出并且用例注定会崩溃或导致隐患
所以应该从 zmq.pyzmq_version()
检查开始(配置管理中针对受控环境的专业策略,不是吗?)并捕获并解决任何非-匹配案例。
为什么 zmq
部分不起作用?
很难说。没有完整的代码,人们可能只是猜测。 zmq.LINGER
未明确设置为零时,我的候选人会出现错误/丢失终止,这会导致挂断 Context()
- 阻止实际资源直到重新启动的实例。正确使用 ZeroMQ 工具反映了这些防御性编程方法(LINGER
、CONFLATE
、MAXMSGSIZE
、IMMEDIATE
、白名单和许多其他防御性 .setsockopt()
和 Context()
-参数化选项 ),因为 distributed-computing 很复杂,可能会在您的控制域或视线之外的许多地方发生故障。
因此,如果您的架构系统要变得健壮和自我修复,请毫不犹豫地成为一名防御性编程设计师。
aLocalCONTEXT = zmq.Context( nIOthreads )
aSUBsocket = aLocalCONTEXT.socket( zmq.SUB )
try:
aSUBsocket.bind( "tcp://*:6666" )
except:
...
aSUBsocket.setsockopt( zmq.LINGER, 0 )
aSUBsocket.setsockopt( zmq.SUBSCRIBE, b"" )
aSUBsocket.setsockopt( zmq.MAXMSGSIZE, ...)
...
最好重新阅读有关调整其他 ISO-OSI-L2/L3 相关参数以获得最佳性能和最安全 distributed-computing 策略的本机 ZeroMQ API 文档。值得花时间为每个新的 API 更新这样做,自 v2.1+...
以来一直如此这个的反例?
看看 .send_reply()
method 如何不受其自身 ImageHub
[ 的非 REQ-REP
实例的崩溃保护=116=] Class.
为什么?
到目前为止,对 .send_reply()
方法的非保护调用将在对任何 ImageHub
-class 实例进行任何此类调用时使应用程序崩溃,该实例是在其中初始化的-默认REQ_REP = False
(即在PUB/SUB
模式的SUB
端可用)。
aNonDefaultImageHUB_instance.send( "this will crash" )
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/ms/anaconda2/lib/python2.7/site-packages/zmq/sugar/socket.py", line 395, in send
return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
File "zmq/backend/cython/socket.pyx", line 725, in zmq.backend.cython.socket.Socket.send
File "zmq/backend/cython/socket.pyx", line 772, in zmq.backend.cython.socket.Socket.send
File "zmq/backend/cython/socket.pyx", line 247, in zmq.backend.cython.socket._send_copy
File "zmq/backend/cython/socket.pyx", line 242, in zmq.backend.cython.socket._send_copy
File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Operation not supported