REQ/REP & DEALER/ROUTER 用于双向异步 worker 处理
REQ/REP & DEALER/ROUTER for two-way asynchronous worker processing
我正在学习 ZeroMQ,刚刚学习了教程和一些示例。我使用 Node.js 作为我的主要环境( Python 最终被用来代替我的工人)。
尝试弄清楚如何创建一个完全异步的消息系统,让我的 API 推送任务(通过 REQ
套接字)到路由器,让经销商将消息传递给工作人员,处理消息并将其结果发送回我的客户(这是一条快速路线)。
我相信这个模式会像这样工作(还没有测试或正确实现代码,所以请把它作为一个概念大纲):
router.js
const zmq = require('zmq');;
const frontend = zmq.socket('router');
const backend = zmq.socket('dealer');
frontend.on('message', function() {
var args = Array.apply(null, arguments);
backend.send(args);
});
backend.on('message', function() {
var args = Array.apply(null, arguments);
frontend.send(args);
});
frontend.bindSync('tcp://*:5559');
backend.bindSync('tcp://*:5560');
client.js
var zmq = require('zmq'),
var express = require('express');
var app = express();
app.post('send', function(req, res) {
var client = zmq.socket('req');
// listen for responses from the server
client.on('message', function(data) {
console.log(data);
client.close();
});
// connect to the server port
client.connect('tcp://0.0.0.0:5454');
client.send('Request from ' + process.id);
});
app.listen('80');
worker.js
var zmq = require('zmq');
var server = zmq.socket('rep');
server.on('message', function(d){
server.send('Response from ' + process.id);
});
// bind to port 5454
server.bind('tcp://0.0.0.0:5454', function(err){
if (err){
console.error("something bad happened");
console.error( err.msg );
console.error( err.stack );
process.exit(0);
}
});
我不完全理解的是 ROUTER/DEALER
是否会处理将响应工作者发送到正确的客户端。同样在这种情况下,经销商会处理公平排队,因为我希望我的工作平均分配给工人。
我的客户端可以分布在许多不同的盒子中(负载均衡器 API 服务器),我的路由器将在其自己的服务器上,工作人员也将分布在多个盒子中。
似乎我在使用 DEALER/ROUTER
而我应该使用 XREQ
和 XREP
.
broker.js
var zmq = require('zmq');
var frontPort = 'tcp://127.0.0.1:5559';
var backPort = 'tcp://127.0.0.1:5560';
var frontSocket = zmq.socket('xrep');
var backSocket = zmq.socket('xreq');
frontSocket.identity = 'xrep_' + process.pid;
backSocket.identity = 'xreq_' + process.pid;
frontSocket.bind(frontPort, function (err) {
console.log('bound', frontPort);
});
frontSocket.on('message', function() {
//pass to back
console.log('router: sending to server', arguments[0].toString(), arguments[2].toString());
backSocket.send(Array.prototype.slice.call(arguments));
});
backSocket.bind(backPort, function (err) {
console.log('bound', backPort);
});
backSocket.on('message', function() {
//pass to front
console.log('dealer: sending to client', arguments[0].toString(), arguments[2].toString());
frontSocket.send(Array.prototype.slice.call(arguments));
});
console.log('Broker started...');
worker.js
var zmq = require('zmq');
var socket = zmq.socket('rep');
socket.identity = 'worker_' + process.pid;
socket.on('message', function(data) {
console.log(socket.identity + ': received ' + data.toString());
socket.send(data * 2);
});
socket.connect('tcp://127.0.0.1:5560', function(err) {
if (err) throw err;
console.log('server connected!');
});
console.log('Worker started...');
client.js
var zmq = require('zmq');
var socket = zmq.socket('req');
socket.identity = 'client_' + process.pid;
socket.on('message', function(data) {
console.log(socket.identity + ': answer data ' + data);
});
socket.connect('tcp://127.0.0.1:5559');
setInterval(function() {
var value = Math.floor(Math.random()*100);
console.log(socket.identity + ': asking ' + value);
socket.send(value);
}, 100);
console.log('Client started...');
我仍然不确定在每个 API 入站请求上打开连接是否安全。
在任何生产级应用中忘记 REQ/REP
,可能会陷入相互死锁
您可能会在 REQ/REP
正式可扩展通信模式中关于高风险相互 FSM-FSM 死锁的许多其他帖子中找到这个主题。
确定,XREQ/XREP == DEALER/ROUTER
(自 2011 年以来)
源代码删除了这背后的所有隐藏魔法,XREQ == DEALER
和 XREP == ROUTER
+++b/include/zmq.h
...
-#define ZMQ_XREQ 5
-#define ZMQ_XREP 6
+#define ZMQ_DEALER 5
+#define ZMQ_ROUTER 6
...
+#define ZMQ_XREQ ZMQ_DEALER /* Old alias, remove in 3.x */
+#define ZMQ_XREP ZMQ_ROUTER /* Old alias, remove in 3.x */
对于以后阅读本文的任何人,在我进一步的研究中我偶然发现了 Majordomo Protocol/pattern。这正是我要实现的。可以在此处阅读有关实施、优缺点的文档:https://rfc.zeromq.org/spec:18/MDP/. Here's the broker implementation: https://github.com/zeromq/majordomo
我正在学习 ZeroMQ,刚刚学习了教程和一些示例。我使用 Node.js 作为我的主要环境( Python 最终被用来代替我的工人)。
尝试弄清楚如何创建一个完全异步的消息系统,让我的 API 推送任务(通过 REQ
套接字)到路由器,让经销商将消息传递给工作人员,处理消息并将其结果发送回我的客户(这是一条快速路线)。
我相信这个模式会像这样工作(还没有测试或正确实现代码,所以请把它作为一个概念大纲):
router.js
const zmq = require('zmq');;
const frontend = zmq.socket('router');
const backend = zmq.socket('dealer');
frontend.on('message', function() {
var args = Array.apply(null, arguments);
backend.send(args);
});
backend.on('message', function() {
var args = Array.apply(null, arguments);
frontend.send(args);
});
frontend.bindSync('tcp://*:5559');
backend.bindSync('tcp://*:5560');
client.js
var zmq = require('zmq'),
var express = require('express');
var app = express();
app.post('send', function(req, res) {
var client = zmq.socket('req');
// listen for responses from the server
client.on('message', function(data) {
console.log(data);
client.close();
});
// connect to the server port
client.connect('tcp://0.0.0.0:5454');
client.send('Request from ' + process.id);
});
app.listen('80');
worker.js
var zmq = require('zmq');
var server = zmq.socket('rep');
server.on('message', function(d){
server.send('Response from ' + process.id);
});
// bind to port 5454
server.bind('tcp://0.0.0.0:5454', function(err){
if (err){
console.error("something bad happened");
console.error( err.msg );
console.error( err.stack );
process.exit(0);
}
});
我不完全理解的是 ROUTER/DEALER
是否会处理将响应工作者发送到正确的客户端。同样在这种情况下,经销商会处理公平排队,因为我希望我的工作平均分配给工人。
我的客户端可以分布在许多不同的盒子中(负载均衡器 API 服务器),我的路由器将在其自己的服务器上,工作人员也将分布在多个盒子中。
似乎我在使用 DEALER/ROUTER
而我应该使用 XREQ
和 XREP
.
broker.js
var zmq = require('zmq');
var frontPort = 'tcp://127.0.0.1:5559';
var backPort = 'tcp://127.0.0.1:5560';
var frontSocket = zmq.socket('xrep');
var backSocket = zmq.socket('xreq');
frontSocket.identity = 'xrep_' + process.pid;
backSocket.identity = 'xreq_' + process.pid;
frontSocket.bind(frontPort, function (err) {
console.log('bound', frontPort);
});
frontSocket.on('message', function() {
//pass to back
console.log('router: sending to server', arguments[0].toString(), arguments[2].toString());
backSocket.send(Array.prototype.slice.call(arguments));
});
backSocket.bind(backPort, function (err) {
console.log('bound', backPort);
});
backSocket.on('message', function() {
//pass to front
console.log('dealer: sending to client', arguments[0].toString(), arguments[2].toString());
frontSocket.send(Array.prototype.slice.call(arguments));
});
console.log('Broker started...');
worker.js
var zmq = require('zmq');
var socket = zmq.socket('rep');
socket.identity = 'worker_' + process.pid;
socket.on('message', function(data) {
console.log(socket.identity + ': received ' + data.toString());
socket.send(data * 2);
});
socket.connect('tcp://127.0.0.1:5560', function(err) {
if (err) throw err;
console.log('server connected!');
});
console.log('Worker started...');
client.js
var zmq = require('zmq');
var socket = zmq.socket('req');
socket.identity = 'client_' + process.pid;
socket.on('message', function(data) {
console.log(socket.identity + ': answer data ' + data);
});
socket.connect('tcp://127.0.0.1:5559');
setInterval(function() {
var value = Math.floor(Math.random()*100);
console.log(socket.identity + ': asking ' + value);
socket.send(value);
}, 100);
console.log('Client started...');
我仍然不确定在每个 API 入站请求上打开连接是否安全。
在任何生产级应用中忘记 REQ/REP
,可能会陷入相互死锁
您可能会在 REQ/REP
正式可扩展通信模式中关于高风险相互 FSM-FSM 死锁的许多其他帖子中找到这个主题。
确定,XREQ/XREP == DEALER/ROUTER
(自 2011 年以来)
源代码删除了这背后的所有隐藏魔法,XREQ == DEALER
和 XREP == ROUTER
+++b/include/zmq.h
...
-#define ZMQ_XREQ 5
-#define ZMQ_XREP 6
+#define ZMQ_DEALER 5
+#define ZMQ_ROUTER 6
...
+#define ZMQ_XREQ ZMQ_DEALER /* Old alias, remove in 3.x */
+#define ZMQ_XREP ZMQ_ROUTER /* Old alias, remove in 3.x */
对于以后阅读本文的任何人,在我进一步的研究中我偶然发现了 Majordomo Protocol/pattern。这正是我要实现的。可以在此处阅读有关实施、优缺点的文档:https://rfc.zeromq.org/spec:18/MDP/. Here's the broker implementation: https://github.com/zeromq/majordomo