ZeroMQ:订阅者订阅的主题数量有限制吗?
ZeroMQ: is there a limitation for the amount of topics that SUBs can subscribe for?
我正在使用 ZeroMQ 的 PUB/SUB 套接字模式。 PUB 制作并发布股票的财务数据。主题设置为每只股票的代码。在SUB端,客户可以根据股票代码订阅自己想要的数据。 PUB用C写,SUB用Python.
写
但是,在测试过程中出现了问题。如果在 SUB 套接字上只设置一个股票代码作为消息过滤器,则一切正常。但是当涉及到大量股票时,程序会在很短的时间内崩溃,报错"Segmentation fault (core dumped)"(详见下文)。
这里是PUB (C)的代码:
while (1) {
int rc = 0;
// send topic
rc = zmq_send(pub_socket, topic, rc, ZMQ_SNDMORE);
if (rc == -1) {
// error handling
}
// send stock data
rc = zmq_send(pub_socket, data, rc, 0);
if (rc == -1) {
// error handling
}
}
这里是SUB (Python)的代码:
import zmq
# initialize a SUB socket
ctx = zmq.Context.instance()
socket = ctx.socket(zmq.SUB)
# set socket options to filter message
for code in code_list:
socket.setsockopt_string(zmq.SUBSCRIBE, code)
socket.connect(PUB_ADDR)
# recv data from PUB
while True:
data = socket.recv()
print(data)
我也用gdb来调试程序。 调试结果 如下:
debug result
有谁知道程序崩溃的原因吗?欢迎任何帮助,谢谢。
更新:
如果我用以下代码替换 setsockopt_string
部分,Python 脚本运行良好。 St运行ge 东西...我需要更深入地研究 setsockopt_string
函数。
Python中的新代码:
socket.setsockopt_string( zmq.SUBSCRIBE, "" )
最新更新:
我运行 @user3666197 提供的脚本,得到调试日志。我select因为日志很长所以只写了几个部分
套接字初始化
setsockopt_string完成
接收一条消息并退出
简介:
PUB
-端使用 ZeroMQ v 4.1.5;
SUB
-side 使用 ZeroMQ Python wrapper 16.0.2
隐含地,这使得 PUB/SUB
模式,与前几代 API 相比,回到 v 2.0,依赖于 PUB
端过滤,而您的 SIGSEGV
指示报告 SUB
端出现问题。
尽管假设过滤是根本原因,但我记得一些关于大树过滤问题的技术辩论,仍然有一个小惊喜,就像一些 post 关于 Trie-搜索,添加的 ""
叶节点也做了一个神奇的服务。如果有帮助,将尝试再次找到此辩论。
ZeroMQ 过滤器中的初始 remarks from Martin Sustrik refer up to 约 ~10,000 个订阅不会产生问题(在进一步设计讨论中有一些更高的数字):
Efficient Subscription Matching
In ZeroMQ, simple tries are used to store and match PUB/SUB
subscriptions. The subscription mechanism was intended for up to 10,000 subscriptions where simple trie works well. However, there are users who use as much as 150,000,000 subscriptions. In such cases there's a need for a more efficient data structure. Thus, nanomsg uses memory-efficient version of Patricia trie instead of simple trie.
For more details check this article.
始终至少使用循序渐进的方法来诊断原因:
一个小小的测试mod化会让你更接近于打开问题的真实信封:
import zmq
pass; print "DEBUG: Ok, zmq imported. [ver:{0:}]".format( zmq.pyzmq_version() )
#_______________________________________________# SETUP ZMQ:
ctx = zmq.Context( 2 ) # Context( nIOthreads )
pass; print "DEBUG: Ok, zmq.Context() instantiated."
socket = ctx.socket( zmq.SUB ) # Socket( .SUB )
pass; print "DEBUG: Ok, Socket instantiated."
socket.connect( PUB_ADDR ) # .connect()
pass; print "DEBUG: Ok, .connect() completed."
socket.setsockopt( zmq.LINGER, 0 ) # explicit LINGER
pass; print "DEBUG: Ok, .setsockopt( LINGER, 0 ) completed."
#_______________________________________________# SET FILTER:
for code in code_list:
pass; print "DEBUG: Going to set SUB side n-th filter: {0: > 1000d}. == [{1:}]".format( code_list.index( code ), repr( code ) ),
socket.setsockopt_string( zmq.SUBSCRIBE, code )
pass; print "DEBUG: Ok, this one was done."
pass; print "DEBUG: Ok, all items from <code_list> exhausted."
#_______________________________________________# LOOP FOREVER:
while True:
try:
print "LOOP: .recv() call."
data = socket.recv()
print "LOOP: .recv()-ed {0:}[B] repr()-ed as [{1:}]".format( len( data ), repr( data ) )
except KeyboardInterrupt():
print "EXC: Ctrl-C will terminate."
except:
print "EXC: will terminate."
finally:
pass; print "DEBUG: Ok, finally: section entered:"
socket.close()
pass; print "DEBUG: Ok, Socket instance .close() call returned"
ctx.term()
pass; print "DEBUG: Ok, .Context() instance term()-ed"
break
鉴于描述的测试用例只有一个 PUB
和一个 SUB
,另一个性能扩展和详细的缓冲区管理问题目前不会引发问题。在 运行 mod-ed 测试和 post 琐碎的 DEBUG:log.
之后会看到结果
每秒发送大约 3k 条消息也不成问题。
更新:遗漏点 -- (1) Unicode 处理 + (2) 主题过滤器
(1) 如 DEBUG:log 所示,您混合了 Unicode 和纯 C 字节数组。 These representations MUST match - system-wide(从 .send_string()
,经过 .setsockopt_string()
,直到 .recv_string()
)
data = socket.recv_string() # AS YOUR DEBUG:log shows the b'mkt_bar...'
(2) 主题-过滤器必须匹配 - 否则消息会被分类为非订阅消息...所以u'abc ....' 过滤器匹配 u'abc ....' 消息。否则:
setsockopt_string( option, optval, encoding='utf-8' )
An empty optval
of length zero shall subscribe to all incoming messages. A non-empty optval
shall subscribe to all messages beginning with the specified prefix. Multiple filters may be attached to a single ZMQ_SUB socket, in which case a message shall be accepted if it matches at least one filter.
DEBUG:log 上面提供的片段显示(好吧,PrintScreens ... -- 请,下次 而不是 copy/paste 终端 ASCII,而不是图片,除非显示一些 GUI 功能,对吗?谢谢...),您的主题过滤器在定义的意义上永远不会匹配。解决这个问题。全系统。
ZeroMQ 不应该归咎于此,Unicode + C 字节数组根本无法工作,如果混合使用或调用接口错误,则头疼。
结语:
如果仍然指责 ZeroMQ 主题过滤能力,最简单的 a/b-test 到(dis)- 批准 Null 假设将是 运行 非常相同的测试,但只有 5 个主题-过滤元件到位。如果这也崩溃了,那么你关于容量相关限制的假设是错误的。
继续走!
我正在使用 ZeroMQ 的 PUB/SUB 套接字模式。 PUB 制作并发布股票的财务数据。主题设置为每只股票的代码。在SUB端,客户可以根据股票代码订阅自己想要的数据。 PUB用C写,SUB用Python.
写但是,在测试过程中出现了问题。如果在 SUB 套接字上只设置一个股票代码作为消息过滤器,则一切正常。但是当涉及到大量股票时,程序会在很短的时间内崩溃,报错"Segmentation fault (core dumped)"(详见下文)。
这里是PUB (C)的代码:
while (1) {
int rc = 0;
// send topic
rc = zmq_send(pub_socket, topic, rc, ZMQ_SNDMORE);
if (rc == -1) {
// error handling
}
// send stock data
rc = zmq_send(pub_socket, data, rc, 0);
if (rc == -1) {
// error handling
}
}
这里是SUB (Python)的代码:
import zmq
# initialize a SUB socket
ctx = zmq.Context.instance()
socket = ctx.socket(zmq.SUB)
# set socket options to filter message
for code in code_list:
socket.setsockopt_string(zmq.SUBSCRIBE, code)
socket.connect(PUB_ADDR)
# recv data from PUB
while True:
data = socket.recv()
print(data)
我也用gdb来调试程序。 调试结果 如下:
debug result
有谁知道程序崩溃的原因吗?欢迎任何帮助,谢谢。
更新:
如果我用以下代码替换 setsockopt_string
部分,Python 脚本运行良好。 St运行ge 东西...我需要更深入地研究 setsockopt_string
函数。
Python中的新代码:
socket.setsockopt_string( zmq.SUBSCRIBE, "" )
最新更新:
我运行 @user3666197 提供的脚本,得到调试日志。我select因为日志很长所以只写了几个部分
套接字初始化
简介:
PUB
-端使用 ZeroMQ v 4.1.5;
SUB
-side 使用 ZeroMQ Python wrapper 16.0.2
隐含地,这使得 PUB/SUB
模式,与前几代 API 相比,回到 v 2.0,依赖于 PUB
端过滤,而您的 SIGSEGV
指示报告 SUB
端出现问题。
尽管假设过滤是根本原因,但我记得一些关于大树过滤问题的技术辩论,仍然有一个小惊喜,就像一些 post 关于 Trie-搜索,添加的 ""
叶节点也做了一个神奇的服务。如果有帮助,将尝试再次找到此辩论。
ZeroMQ 过滤器中的初始 remarks from Martin Sustrik refer up to 约 ~10,000 个订阅不会产生问题(在进一步设计讨论中有一些更高的数字):
Efficient Subscription Matching
In ZeroMQ, simple tries are used to store and match
PUB/SUB
subscriptions. The subscription mechanism was intended for up to 10,000 subscriptions where simple trie works well. However, there are users who use as much as 150,000,000 subscriptions. In such cases there's a need for a more efficient data structure. Thus, nanomsg uses memory-efficient version of Patricia trie instead of simple trie.For more details check this article.
始终至少使用循序渐进的方法来诊断原因:
一个小小的测试mod化会让你更接近于打开问题的真实信封:
import zmq
pass; print "DEBUG: Ok, zmq imported. [ver:{0:}]".format( zmq.pyzmq_version() )
#_______________________________________________# SETUP ZMQ:
ctx = zmq.Context( 2 ) # Context( nIOthreads )
pass; print "DEBUG: Ok, zmq.Context() instantiated."
socket = ctx.socket( zmq.SUB ) # Socket( .SUB )
pass; print "DEBUG: Ok, Socket instantiated."
socket.connect( PUB_ADDR ) # .connect()
pass; print "DEBUG: Ok, .connect() completed."
socket.setsockopt( zmq.LINGER, 0 ) # explicit LINGER
pass; print "DEBUG: Ok, .setsockopt( LINGER, 0 ) completed."
#_______________________________________________# SET FILTER:
for code in code_list:
pass; print "DEBUG: Going to set SUB side n-th filter: {0: > 1000d}. == [{1:}]".format( code_list.index( code ), repr( code ) ),
socket.setsockopt_string( zmq.SUBSCRIBE, code )
pass; print "DEBUG: Ok, this one was done."
pass; print "DEBUG: Ok, all items from <code_list> exhausted."
#_______________________________________________# LOOP FOREVER:
while True:
try:
print "LOOP: .recv() call."
data = socket.recv()
print "LOOP: .recv()-ed {0:}[B] repr()-ed as [{1:}]".format( len( data ), repr( data ) )
except KeyboardInterrupt():
print "EXC: Ctrl-C will terminate."
except:
print "EXC: will terminate."
finally:
pass; print "DEBUG: Ok, finally: section entered:"
socket.close()
pass; print "DEBUG: Ok, Socket instance .close() call returned"
ctx.term()
pass; print "DEBUG: Ok, .Context() instance term()-ed"
break
鉴于描述的测试用例只有一个 PUB
和一个 SUB
,另一个性能扩展和详细的缓冲区管理问题目前不会引发问题。在 运行 mod-ed 测试和 post 琐碎的 DEBUG:log.
每秒发送大约 3k 条消息也不成问题。
更新:遗漏点 -- (1) Unicode 处理 + (2) 主题过滤器
(1) 如 DEBUG:log 所示,您混合了 Unicode 和纯 C 字节数组。 These representations MUST match - system-wide(从 .send_string()
,经过 .setsockopt_string()
,直到 .recv_string()
)
data = socket.recv_string() # AS YOUR DEBUG:log shows the b'mkt_bar...'
(2) 主题-过滤器必须匹配 - 否则消息会被分类为非订阅消息...所以u'abc ....' 过滤器匹配 u'abc ....' 消息。否则:
setsockopt_string( option, optval, encoding='utf-8' )
An emptyoptval
of length zero shall subscribe to all incoming messages. A non-emptyoptval
shall subscribe to all messages beginning with the specified prefix. Multiple filters may be attached to a single ZMQ_SUB socket, in which case a message shall be accepted if it matches at least one filter.
DEBUG:log 上面提供的片段显示(好吧,PrintScreens ... -- 请,下次 而不是 copy/paste 终端 ASCII,而不是图片,除非显示一些 GUI 功能,对吗?谢谢...),您的主题过滤器在定义的意义上永远不会匹配。解决这个问题。全系统。
ZeroMQ 不应该归咎于此,Unicode + C 字节数组根本无法工作,如果混合使用或调用接口错误,则头疼。
结语:
如果仍然指责 ZeroMQ 主题过滤能力,最简单的 a/b-test 到(dis)- 批准 Null 假设将是 运行 非常相同的测试,但只有 5 个主题-过滤元件到位。如果这也崩溃了,那么你关于容量相关限制的假设是错误的。
继续走!