NodeJS - 响应流
NodeJS - Response stream
我使用 Sails.js 使用 NodeJS 构建了一个简单的 API 端点。
当有人访问我的 API 端点时,服务器开始等待数据,每当出现新数据时,他都会使用套接字进行广播。每个客户端都应该根据他的用户输入接收他自己的数据流。
var Cap = require('cap').Cap;
collect: function (req, res) {
var iface = req.param("ip");
var c = new Cap(),
device = Cap.findDevice(ip);
c.on('data', function(myData) {
sails.sockets.blast('message', {"host": myData});
});
});
响应未完成(我从未发送 res.json() - 实际发生的是浏览器继续加载 - 但上述功能有效)。
2 个问题:
我正在尝试从我的客户端(使用 RxJS)订阅和取消订阅这个 API 端点。当我订阅时,我开始通过套接字接收数据 - 但我无法取消订阅 API 端点(浏览器期望请求完成)。
每个客户端都应该根据请求的IP参数订阅自己的socket room(见更新代码)。目前正在向大家群发消息
我如何创建 stream/service-like API 端点 Sails.js 以根据每个用户的输入向其发送新数据?
我的目标是能够从每个客户端订阅/取消订阅此 API 端点。
修改后的答案
假设您的 API 端点在 config/routes.js
中定义如下:
...
'get /collect': 'SomeController.collectSubscribe',
'delete /collect': 'SomeController.collectUnsubscribe',
由于每个 Cap
实例都绑定到一个设备,我们需要为每个订阅一个实例。我们没有使用 sails join
/leave
方法,而是在内存中跟踪 Cap
个实例,并且只 broadcast
到请求套接字的 id。这是有效的,因为默认情况下 Sails 套接字订阅了它们自己的 ID。
在api/controllers/SomeController.js
中:
// In order for the `Cap` instances to persist after `collectSubscribe` finishes, we store them all in an Object, associated with which socket the were created for.
var caps = {/* req.socket.id: <instance of Cap>, */};
module.exports = {
...
collectSubscribe: function(req, res) {
if (!res.isSocket) return res.badRequest("I need a websocket! Help!");
if (!!caps[req.socket.id]) return res.badRequest("Dude, you are already subscribed.");
caps[req.socket.id] = new Cap();
var c = caps[req.socket.id]; // remember that `c` is a reference to our new `Cap`, not a copy.
var device = c.findDevice(req.param('ip'));
c.open(device, ...);
c.on('data', function(myData) {
sails.sockets.broadcast(req.socket.id, 'message', {host: myData});
});
return res.ok();
},
collectUnsubscribe: function(req, res) {
if (!res.isSocket) return res.badRequest("I need a websocket! Help!");
if (!caps[req.socket.id]) return res.badRequest("I can't unsubscribe you unless you actually subscribe first.");
caps[req.socket.id].removeAllListeners('data');
delete caps[req.socket.id];
return res.ok();
}
}
基本上,它是这样的:当浏览器请求触发 collectSubscribe
时,一个新的 Cap
实例会侦听提供的 IP。当浏览器触发 collectUnsubscribe
时,服务器检索 Cap
实例,告诉它停止侦听,然后将其删除。
生产注意事项:请注意 Cap
s 的列表不是持久的(因为它存储在内存中而不是数据库中)!因此,如果您的服务器关闭并重新启动(由于闪电风暴等),该列表将被清除,但考虑到所有 websocket 连接都将被丢弃,我认为没有必要为此担心。
旧答案,留作参考
您可以使用 sails.sockets.join(req, room)
和 sails.sockets.leave(req, room)
来管理插座室。本质上,您有一个名为 "collect" 的房间,只有加入该房间的套接字才会收到 sails.sockets.broadcast(room, eventName, data)
.
有关如何使用 sails.sockets
here.
的更多信息
在api/controllers/SomeController.js
中:
collectSubscribe: function(req, res) {
if (!res.isSocket) return res.badRequest();
sails.sockets.join(req, 'collect');
return res.ok();
},
collectUnsubscribe: function(req, res) {
if (!res.isSocket) return res.badRequest();
sails.sockets.leave(req, 'collect');
return res.ok();
}
最后,我们需要告诉服务器向我们的 'collect'
房间广播消息。
请注意,这只需要发生 一次,因此您可以在 config/
目录下的文件中执行此操作。
对于这个例子,我将把它放在 config/sockets.js
module.exports = {
// ...
};
c.on('data', function(myData) {
var eventName = 'message';
var data = {host: myData};
sails.sockets.broadcast('collect', eventName, data);
});
我假设 c
可以在这里访问;如果没有,您可以将其定义为 sails.c = ...
以使其可在全球范围内访问。
我使用 Sails.js 使用 NodeJS 构建了一个简单的 API 端点。
当有人访问我的 API 端点时,服务器开始等待数据,每当出现新数据时,他都会使用套接字进行广播。每个客户端都应该根据他的用户输入接收他自己的数据流。
var Cap = require('cap').Cap;
collect: function (req, res) {
var iface = req.param("ip");
var c = new Cap(),
device = Cap.findDevice(ip);
c.on('data', function(myData) {
sails.sockets.blast('message', {"host": myData});
});
});
响应未完成(我从未发送 res.json() - 实际发生的是浏览器继续加载 - 但上述功能有效)。
2 个问题:
我正在尝试从我的客户端(使用 RxJS)订阅和取消订阅这个 API 端点。当我订阅时,我开始通过套接字接收数据 - 但我无法取消订阅 API 端点(浏览器期望请求完成)。
每个客户端都应该根据请求的IP参数订阅自己的socket room(见更新代码)。目前正在向大家群发消息
我如何创建 stream/service-like API 端点 Sails.js 以根据每个用户的输入向其发送新数据?
我的目标是能够从每个客户端订阅/取消订阅此 API 端点。
修改后的答案
假设您的 API 端点在 config/routes.js
中定义如下:
...
'get /collect': 'SomeController.collectSubscribe',
'delete /collect': 'SomeController.collectUnsubscribe',
由于每个 Cap
实例都绑定到一个设备,我们需要为每个订阅一个实例。我们没有使用 sails join
/leave
方法,而是在内存中跟踪 Cap
个实例,并且只 broadcast
到请求套接字的 id。这是有效的,因为默认情况下 Sails 套接字订阅了它们自己的 ID。
在api/controllers/SomeController.js
中:
// In order for the `Cap` instances to persist after `collectSubscribe` finishes, we store them all in an Object, associated with which socket the were created for.
var caps = {/* req.socket.id: <instance of Cap>, */};
module.exports = {
...
collectSubscribe: function(req, res) {
if (!res.isSocket) return res.badRequest("I need a websocket! Help!");
if (!!caps[req.socket.id]) return res.badRequest("Dude, you are already subscribed.");
caps[req.socket.id] = new Cap();
var c = caps[req.socket.id]; // remember that `c` is a reference to our new `Cap`, not a copy.
var device = c.findDevice(req.param('ip'));
c.open(device, ...);
c.on('data', function(myData) {
sails.sockets.broadcast(req.socket.id, 'message', {host: myData});
});
return res.ok();
},
collectUnsubscribe: function(req, res) {
if (!res.isSocket) return res.badRequest("I need a websocket! Help!");
if (!caps[req.socket.id]) return res.badRequest("I can't unsubscribe you unless you actually subscribe first.");
caps[req.socket.id].removeAllListeners('data');
delete caps[req.socket.id];
return res.ok();
}
}
基本上,它是这样的:当浏览器请求触发 collectSubscribe
时,一个新的 Cap
实例会侦听提供的 IP。当浏览器触发 collectUnsubscribe
时,服务器检索 Cap
实例,告诉它停止侦听,然后将其删除。
生产注意事项:请注意 Cap
s 的列表不是持久的(因为它存储在内存中而不是数据库中)!因此,如果您的服务器关闭并重新启动(由于闪电风暴等),该列表将被清除,但考虑到所有 websocket 连接都将被丢弃,我认为没有必要为此担心。
旧答案,留作参考
您可以使用 sails.sockets.join(req, room)
和 sails.sockets.leave(req, room)
来管理插座室。本质上,您有一个名为 "collect" 的房间,只有加入该房间的套接字才会收到 sails.sockets.broadcast(room, eventName, data)
.
有关如何使用 sails.sockets
here.
在api/controllers/SomeController.js
中:
collectSubscribe: function(req, res) {
if (!res.isSocket) return res.badRequest();
sails.sockets.join(req, 'collect');
return res.ok();
},
collectUnsubscribe: function(req, res) {
if (!res.isSocket) return res.badRequest();
sails.sockets.leave(req, 'collect');
return res.ok();
}
最后,我们需要告诉服务器向我们的 'collect'
房间广播消息。
请注意,这只需要发生 一次,因此您可以在 config/
目录下的文件中执行此操作。
对于这个例子,我将把它放在 config/sockets.js
module.exports = {
// ...
};
c.on('data', function(myData) {
var eventName = 'message';
var data = {host: myData};
sails.sockets.broadcast('collect', eventName, data);
});
我假设 c
可以在这里访问;如果没有,您可以将其定义为 sails.c = ...
以使其可在全球范围内访问。