ZeroMQ with Chapel And Python,无法在当前状态下回答
ZeroMQ with Chapel And Python, cannot answer in current state
我不能确定错误在哪里,但我正在尝试在 Python 客户端和 Chapel 服务器之间传递消息。客户端代码为
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
for request in range(10):
print("Sending request %s ..." % request)
socket.send(str("Yo"))
message = socket.recv()
print("OMG!! He said %s" % message)
Chapel 服务器是
use ZMQ;
var context: Context;
var socket = context.socket(ZMQ.REP);
socket.bind("tcp://*:5555");
while ( 1 < 2) {
var msg = socket.recv(string);
socket.recv(string);
writeln("got something");
socket.send("back from chapel");
}
这个消息看起来很普通,但我并没有真正理解它。
server.chpl:7: error: halt reached - Error in Socket.recv(string): Operation cannot be accomplished in current state
我觉得我两边都是sending/receiving。最初的 Chapel 示例 on the Chapel site 运行良好,但我在修改它时遇到了问题。
更新
在 this thread 的 Chapel 团队的帮助下,现在可以使用了。
客户端.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
for request in range(10):
message = "Hello %i from Python" % request
print("[Python] Sending request: %s" % message)
socket.send_string(message)
message = socket.recv_string()
print("[Python] Received response: %s" % message)
server.chpl
use ZMQ;
var context: Context;
var socket = context.socket(ZMQ.REP);
socket.bind("tcp://*:5555");
for i in 0..#10 {
var msg = socket.recv(string);
writeln("[Chapel] Received message: ", msg);
socket.send("Hello %i from Chapel".format(i));
}
直到 chapel team, kindly test any chapel ZMQ 模块服务解决并重新确认仅使用 int
有效载荷并可能避免 PUB/SUB
原型(由于字符串匹配未决问题)。
由于 @Nick has recently 还有一种方法可以使 ZMQ
服务满足 ZeroMQ API 合规性并完全打开交叉兼容的大门异构分布式系统:
To send a string, Chapel sends one message with the string size followed by another message with the byte buffer; receiving works similarly.
That means that your one call to <aSocket>.recv( string )
was actually making two back-to-back calls to zmq_recv()
under the hood. With the REQ/REP
pattern, those two back-to-back zmq_recv()
calls put the ZeroMQ state machine into an invalid state, hence the error message.
This is definitely a bug with Chapel's ZMQ
module.
几个步骤让场景更清晰:
在诊断根本原因之前,让我提出一些要采取的措施。 ZeroMQ 是一个非常强大的框架,很难找到比 REQ/REP
.
更难(也更脆弱)的消息传递原型。
内部有限状态自动机(实际上是分布式 FSA)都是阻塞的(设计使然,以强制在连接的对等点之间传递类似钟摆的消息(不必只是前 2 个)因此 [A]-.send()
-.recv()
-.send()
-.recv()
-... 的 SEQ 在一侧 [A] 匹配 [B]- 的 SEQ .recv()
-.send()
-.recv()
-... ) 并且这个 dFSA 也有一个基本上不可挽救的相互死锁,如果双方出于任何原因进入等待状态,其中双方[A] 和 [B] 期望收到来自频道对面的下一条消息。
这就是说,我的建议是首先进行尽可能简单的测试 - 使用一对不受限制的单纯形通道(可以是 [A]PUSH
/[B]PULL
+ [B]PUSH
/[A]PULL
,或使用 PUB/SUB
的更复杂的方案)。
不会进入完全网状、多代理基础架构的设置,而是此的简化版本(不需要也无意使用 ROUTER/DEALER
通道,但可能会复制(反向)PUSH/PULL
-s 如果扩展模型方案):
由于当前 chapel 实施限制,将在隐含限制上花费更多精力:
In Chapel, sending or receiving messages on a Socket
uses multipart messages and the Reflection
module to serialize primitive and user-defined data types whenever possible. Currently, the ZMQ
module serializes primitive numeric types, strings, and records composed of these types. Strings are encoded as a length (as int) followed by the character array (in bytes).
如果这些评论不仅仅是线级内部性并且扩展到顶级 ZeroMQ messaging/signalling-layer(参考管理订阅的详细信息,其中 ZeroMQ 主题过滤器匹配基于与接收到的消息等的左侧精确匹配)。
python一方享有更大的设计自由度:
#
# python
# #########
import time
import zmq; context = zmq.Context()
print( "INF: This Agent uses ZeroMQ v.{0:}".format( zmq.__version__ ) )
dataAB = context.socket( zmq.REQ )
dataAB.setsockopt( zmq.LINGER, 0 ) # ( a must in pre v4.0+ )
dataAB.connect( "tcp://localhost:5555" )
heartB = context.socket( zmq.SUB )
heartB.setsockopt( zmq.LINGER, 0 ) # ( a must in pre v4.0+ )
heartB.setsockopt( zmq.CONFLATE, 0 ) # ( ignore history, keep just last )
heartB.connect( "tcp://localhost:6666" )
heartB.setsockopt( zmq.SUBSCRIBE, "[chapel2python.HB]" )
heartB.setsockopt( zmq.SUBSCRIBE, "" ) # in case [Chapel] complicates serialisation
# -------------------------------------------------------------------
while ( True ):
pass; print( "INF: waiting for a [Chapel] HeartBeat-Message" )
hbIN = heartB.recv( zmq.NOBLOCK );
if len( hbIN ) > 0:
pass; print( "ACK: [Chapel] Heart-Beat-Message .recv()-ed" )
break
else:
time.sleep( 0.5 )
# -------------------------------------------------------------------
for request in range(10):
pass; print( "INF: Sending a request %s to [Chapel] ..." % request )
dataAB.send( str( "Yo" ) )
pass; print( "INF: a blocking .recv(), [Chapel] is to answer ..." )
message = dataAB.recv()
pass; print( "INF: [Chapel] said %s" % message )
# -------------------------------------------------------------------
dataAB.close()
heartB.close()
context.term()
# -------------------------------------------------------------------
一些进一步的 try:/except:/finally:
构造应该为来自无限 while()
-loops 等的 KeyboardInterrupt
-s 服务,但为了清楚起见,这些在此处被省略。
在 chapel 方面,我们将尽力跟上 API 的步伐,按原样:
按原样,文档无助于决定用户代码是否有控制选项,如果对 .send()
/ .recv()
方法的调用隐式总是阻塞或不,虽然您的代码假定它处于 运行 阻塞模式(对于任何分布式系统设计,我总是并且主要强烈反对这种模式,但阻塞是一种糟糕的做法 - )。
While the C-level call zmq_send()
may be a blocking call (depending on the socket type and flag arguments), it is desirable that a semantically-blocking call to Socket.send()
allow other Chapel tasks to be scheduled on the OS thread as supported by the tasking layer. Internally, the ZMQ module uses non-blocking calls to zmq_send()
and zmq_recv()
to transfer data, and yields to the tasking layer via chpl_task_yield() when the call would otherwise block.
use ZMQ;
use Reflection;
var context: Context;
var dataBA = context.socket( ZMQ.REP ),
heartB = context.socket( ZMQ.PUB );
var WAITms = 0; // setup as explicit int
dataBA.setsockopt( ZMQ.LINGER, WAITms );// a must
heartB.setsockopt( ZMQ.LINGER, WAITms );// a preventive step
dataBA.bind( "tcp://*:5555" ); // may reverse .bind()/.connect()
writeln( "INF: This Agent uses ZeroMQ v.", ZMQ.version() );
// /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
config var MAX_LOOPS = 120; // --MAX_LOOPS = 10 set on cmdline
var i = 0;
while ( i < MAX_LOOPS ) {
// --------------------------------------- // .send HeartBeat
heartB.send( "[chapel2python.HB]" );
i += 1;
writeln( "INF: Sent HeartBeat # ", i );
// --------------------------------------- // .send HeartBeat
var msg = dataBA.recv( string ); // .recv() from python
// - - - - - - - - - - - - - - - - - - - - // - - - - -WILL-[BLOCK]!!!
// ( ref. src )
writeln( "INF: [Chapel] got: ",
getField( msg, 1 )
);
dataBA.send( "back from chapel" ); // .send() to python
}
writeln( "INF: MAX_LOOPS were exhausted,",
" will exit-{} & .close()",
" channels' sockets before",
" [Chapel] exits to system."
);
// /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
dataBA.close( WAITms ); // explicit graceful termination
heartB.close( WAITms ); // explicit graceful termination
context.deinit(); // explicit context termination
// as not yet sure
// on auto-termination
// warranties
@user3666197 的回答很好地讨论了 ZeroMQ 状态机,我认为问题在于 Chapel ZMQ
模块如何序列化和传输字符串。
Socket.send(string)
and Socket.recv(string)
methods in Chapel serialize a string by sending two messages. This was intended to match the pattern in the ZeroMQ Guide's "Minor Note on Strings",然而,作为实现,这个序列化方案是不正确的,并且与某些 ZeroMQ 套接字模式不兼容。
为了发送一个字符串,Chapel 发送了一个包含多个部分的消息,并调用了两次 zmq_send()
:第一个是带有 ZMQ_SNDMORE
标志的字符串大小,第二个是字节缓冲区;接收工作类似。这意味着您对 socket.recv(string)
的 一个 调用实际上是在 两个 对 zmq_recv()
兜帽。使用 REQ
/REP
模式,这两个背靠背 zmq_recv()
调用将 ZeroMQ 状态机置于无效状态,因此出现错误消息。
这绝对是 Chapel 的 ZMQ
模块的错误。
作为参考,我是(绝对不是没有错误的)Chapel ZMQ
模块的作者。
我不能确定错误在哪里,但我正在尝试在 Python 客户端和 Chapel 服务器之间传递消息。客户端代码为
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
for request in range(10):
print("Sending request %s ..." % request)
socket.send(str("Yo"))
message = socket.recv()
print("OMG!! He said %s" % message)
Chapel 服务器是
use ZMQ;
var context: Context;
var socket = context.socket(ZMQ.REP);
socket.bind("tcp://*:5555");
while ( 1 < 2) {
var msg = socket.recv(string);
socket.recv(string);
writeln("got something");
socket.send("back from chapel");
}
这个消息看起来很普通,但我并没有真正理解它。
server.chpl:7: error: halt reached - Error in Socket.recv(string): Operation cannot be accomplished in current state
我觉得我两边都是sending/receiving。最初的 Chapel 示例 on the Chapel site 运行良好,但我在修改它时遇到了问题。
更新
在 this thread 的 Chapel 团队的帮助下,现在可以使用了。
客户端.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
for request in range(10):
message = "Hello %i from Python" % request
print("[Python] Sending request: %s" % message)
socket.send_string(message)
message = socket.recv_string()
print("[Python] Received response: %s" % message)
server.chpl
use ZMQ;
var context: Context;
var socket = context.socket(ZMQ.REP);
socket.bind("tcp://*:5555");
for i in 0..#10 {
var msg = socket.recv(string);
writeln("[Chapel] Received message: ", msg);
socket.send("Hello %i from Chapel".format(i));
}
直到 chapel team, kindly test any chapel ZMQ 模块服务解决并重新确认仅使用 int
有效载荷并可能避免 PUB/SUB
原型(由于字符串匹配未决问题)。
由于 @Nick has recently ZMQ
服务满足 ZeroMQ API 合规性并完全打开交叉兼容的大门异构分布式系统:
To send a string, Chapel sends one message with the string size followed by another message with the byte buffer; receiving works similarly.
That means that your one call to<aSocket>.recv( string )
was actually making two back-to-back calls tozmq_recv()
under the hood. With theREQ/REP
pattern, those two back-to-backzmq_recv()
calls put the ZeroMQ state machine into an invalid state, hence the error message.This is definitely a bug with Chapel's
ZMQ
module.
几个步骤让场景更清晰:
在诊断根本原因之前,让我提出一些要采取的措施。 ZeroMQ 是一个非常强大的框架,很难找到比 REQ/REP
.
内部有限状态自动机(实际上是分布式 FSA)都是阻塞的(设计使然,以强制在连接的对等点之间传递类似钟摆的消息(不必只是前 2 个)因此 [A]-.send()
-.recv()
-.send()
-.recv()
-... 的 SEQ 在一侧 [A] 匹配 [B]- 的 SEQ .recv()
-.send()
-.recv()
-... ) 并且这个 dFSA 也有一个基本上不可挽救的相互死锁,如果双方出于任何原因进入等待状态,其中双方[A] 和 [B] 期望收到来自频道对面的下一条消息。
这就是说,我的建议是首先进行尽可能简单的测试 - 使用一对不受限制的单纯形通道(可以是 [A]PUSH
/[B]PULL
+ [B]PUSH
/[A]PULL
,或使用 PUB/SUB
的更复杂的方案)。
不会进入完全网状、多代理基础架构的设置,而是此的简化版本(不需要也无意使用 ROUTER/DEALER
通道,但可能会复制(反向)PUSH/PULL
-s 如果扩展模型方案):
由于当前 chapel 实施限制,将在隐含限制上花费更多精力:
In Chapel, sending or receiving messages on a
Socket
uses multipart messages and theReflection
module to serialize primitive and user-defined data types whenever possible. Currently, theZMQ
module serializes primitive numeric types, strings, and records composed of these types. Strings are encoded as a length (as int) followed by the character array (in bytes).
如果这些评论不仅仅是线级内部性并且扩展到顶级 ZeroMQ messaging/signalling-layer(参考管理订阅的详细信息,其中 ZeroMQ 主题过滤器匹配基于与接收到的消息等的左侧精确匹配)。
python一方享有更大的设计自由度:
#
# python
# #########
import time
import zmq; context = zmq.Context()
print( "INF: This Agent uses ZeroMQ v.{0:}".format( zmq.__version__ ) )
dataAB = context.socket( zmq.REQ )
dataAB.setsockopt( zmq.LINGER, 0 ) # ( a must in pre v4.0+ )
dataAB.connect( "tcp://localhost:5555" )
heartB = context.socket( zmq.SUB )
heartB.setsockopt( zmq.LINGER, 0 ) # ( a must in pre v4.0+ )
heartB.setsockopt( zmq.CONFLATE, 0 ) # ( ignore history, keep just last )
heartB.connect( "tcp://localhost:6666" )
heartB.setsockopt( zmq.SUBSCRIBE, "[chapel2python.HB]" )
heartB.setsockopt( zmq.SUBSCRIBE, "" ) # in case [Chapel] complicates serialisation
# -------------------------------------------------------------------
while ( True ):
pass; print( "INF: waiting for a [Chapel] HeartBeat-Message" )
hbIN = heartB.recv( zmq.NOBLOCK );
if len( hbIN ) > 0:
pass; print( "ACK: [Chapel] Heart-Beat-Message .recv()-ed" )
break
else:
time.sleep( 0.5 )
# -------------------------------------------------------------------
for request in range(10):
pass; print( "INF: Sending a request %s to [Chapel] ..." % request )
dataAB.send( str( "Yo" ) )
pass; print( "INF: a blocking .recv(), [Chapel] is to answer ..." )
message = dataAB.recv()
pass; print( "INF: [Chapel] said %s" % message )
# -------------------------------------------------------------------
dataAB.close()
heartB.close()
context.term()
# -------------------------------------------------------------------
一些进一步的 try:/except:/finally:
构造应该为来自无限 while()
-loops 等的 KeyboardInterrupt
-s 服务,但为了清楚起见,这些在此处被省略。
在 chapel 方面,我们将尽力跟上 API 的步伐,按原样:
按原样,文档无助于决定用户代码是否有控制选项,如果对 .send()
/ .recv()
方法的调用隐式总是阻塞或不,虽然您的代码假定它处于 运行 阻塞模式(对于任何分布式系统设计,我总是并且主要强烈反对这种模式,但阻塞是一种糟糕的做法 -
While the C-level call
zmq_send()
may be a blocking call (depending on the socket type and flag arguments), it is desirable that a semantically-blocking call toSocket.send()
allow other Chapel tasks to be scheduled on the OS thread as supported by the tasking layer. Internally, the ZMQ module uses non-blocking calls tozmq_send()
andzmq_recv()
to transfer data, and yields to the tasking layer via chpl_task_yield() when the call would otherwise block.
use ZMQ;
use Reflection;
var context: Context;
var dataBA = context.socket( ZMQ.REP ),
heartB = context.socket( ZMQ.PUB );
var WAITms = 0; // setup as explicit int
dataBA.setsockopt( ZMQ.LINGER, WAITms );// a must
heartB.setsockopt( ZMQ.LINGER, WAITms );// a preventive step
dataBA.bind( "tcp://*:5555" ); // may reverse .bind()/.connect()
writeln( "INF: This Agent uses ZeroMQ v.", ZMQ.version() );
// /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
config var MAX_LOOPS = 120; // --MAX_LOOPS = 10 set on cmdline
var i = 0;
while ( i < MAX_LOOPS ) {
// --------------------------------------- // .send HeartBeat
heartB.send( "[chapel2python.HB]" );
i += 1;
writeln( "INF: Sent HeartBeat # ", i );
// --------------------------------------- // .send HeartBeat
var msg = dataBA.recv( string ); // .recv() from python
// - - - - - - - - - - - - - - - - - - - - // - - - - -WILL-[BLOCK]!!!
// ( ref. src )
writeln( "INF: [Chapel] got: ",
getField( msg, 1 )
);
dataBA.send( "back from chapel" ); // .send() to python
}
writeln( "INF: MAX_LOOPS were exhausted,",
" will exit-{} & .close()",
" channels' sockets before",
" [Chapel] exits to system."
);
// /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
dataBA.close( WAITms ); // explicit graceful termination
heartB.close( WAITms ); // explicit graceful termination
context.deinit(); // explicit context termination
// as not yet sure
// on auto-termination
// warranties
@user3666197 的回答很好地讨论了 ZeroMQ 状态机,我认为问题在于 Chapel ZMQ
模块如何序列化和传输字符串。
Socket.send(string)
and Socket.recv(string)
methods in Chapel serialize a string by sending two messages. This was intended to match the pattern in the ZeroMQ Guide's "Minor Note on Strings",然而,作为实现,这个序列化方案是不正确的,并且与某些 ZeroMQ 套接字模式不兼容。
为了发送一个字符串,Chapel 发送了一个包含多个部分的消息,并调用了两次 zmq_send()
:第一个是带有 ZMQ_SNDMORE
标志的字符串大小,第二个是字节缓冲区;接收工作类似。这意味着您对 socket.recv(string)
的 一个 调用实际上是在 两个 对 zmq_recv()
兜帽。使用 REQ
/REP
模式,这两个背靠背 zmq_recv()
调用将 ZeroMQ 状态机置于无效状态,因此出现错误消息。
这绝对是 Chapel 的 ZMQ
模块的错误。
作为参考,我是(绝对不是没有错误的)Chapel ZMQ
模块的作者。