对 NodeJs 中的 Stream 实现感到困惑
Confused about Stream implementation in NodeJs
我正在尝试实现一个通过套接字发送和接收文件的协议。协议已指定,我无法更改它。
我是 NodeJs 的新手,这就是我尝试实现它的方式。
我将写入双工流,并将文件传输到其中。然后将其通过管道传输到套接字以发送数据。
困惑来自我应该在哪里阅读这个,以及在哪里写那个。如何知道读取文件已完成,以及如何告诉套接字已完成。文档对我来说不是很清楚,谷歌搜索增加了更多的混乱:)
如有任何帮助,我们将不胜感激。
P.S。回家后我会添加自己的样品,我现在没有。
编辑
在@MattHarrison 的回答后,我将代码更改为:
var stream = require('stream');
var util = require('util');
var bufferpack = require('bufferpack');
var fs = require('fs');
var net = require('net');
var MyProtocolStream = function () {
this.writtenHeader = false; // have we written the header yet?
stream.Transform.call(this);
};
util.inherits(MyProtocolStream, stream.Transform);
MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {
if (!this.writtenHeader) {
this.push('==== HEADER ====\n'); // if we've not, send the header first
}
// Can this function be interrupted at this very line?
// Then another _transform comes in and pushes its own data to socket
// Corrupted data maybe then?
// How to prevent this behavior? Buffering whole file before sending?
var self = this;
// I put a random timeout to simulate overlapped calls
// Can this happen in real world?
setTimeout(function () {
self.push(chunk); // send the incoming file chunks along as-is
callback();
}, Math.random()*10);
};
MyProtocolStream.prototype._flush = function (callback) {
this.push('==== FOOTER ====\n'); // Just before the stream closes, send footer
callback();
};
var file = '/tmp/a';
var server = net.createServer(function (sck) {
sck.addr = sck.remoteAddress;
console.log('Client connected - ' + sck.addr);
fs.createReadStream('/tmp/a').pipe(new MyProtocolStream()).pipe(sck);
fs.createReadStream('/tmp/b').pipe(new MyProtocolStream()).pipe(sck);
fs.createReadStream('/tmp/c').pipe(new MyProtocolStream()).pipe(sck);
sck.on('close', function () {
console.log('Client disconnected - ' + this.addr);
})
});
server.listen(22333, function () {
console.log('Server started on ' + 22333)
});
在 _transform
中查看我的评论。
这是一个great resource on streams。
至于套接字用法,当您 send/get 套接字消息时,您发送类型和有效负载。因此,您的通用套接字流可能是在您有数据要发送的同时发送类型为 'file_data' 的消息和内容,最后发送类型为 'eof' 的消息(用于文件结尾)和一个空有效负载。
我不确定您正在尝试实施的协议的具体细节,但以下应该为您提供了一个可以适应您需要的良好模式。
我虚构的协议
当客户端套接字连接到我的 TCP 服务器时,我想向他们发送一个文件。但首先我要发送一个header。在文件的末尾,在流结束之前,我还想发送一个header。所以写入套接字的数据看起来像:
==== HEADER ====
[FILE CONTENTS]
==== FOOTER ====
实现转换流
我只想转换 来自可读流的数据。注意 transform 是这里的关键字。我可以为此使用 Transform
流。
创建转换流时,您可以覆盖两个方法:_transform
和_flush
。 _transform
随着可读流的每个块的到来而被调用。您可以更改数据、缓冲数据或其他任何内容。 _flush
在可读的所有数据完成后立即调用。你可以在这里做更多的清理,或者写出最后一点数据。
var Stream = require('stream');
var Util = require('util');
var MyProtocolStream = function () {
this.writtenHeader = false; // have we written the header yet?
Stream.Transform.call(this);
};
Util.inherits(MyProtocolStream, Stream.Transform);
MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {
if (!this.writtenHeader) {
this.push('==== HEADER ====\n'); // if we've not, send the header first
this.writtenHeader = true;
}
this.push(chunk); // send the incoming file chunks along as-is
callback();
};
MyProtocolStream.prototype._flush = function (callback) {
this.push('==== FOOTER ====\n'); // Just before the stream closes, send footer
callback();
};
使用 MyProtocolStream
所以现在我有一个流可以执行我想要的操作,我可以简单地将一个文件(或任何可读流)通过我的自定义转换流传输到任何其他可写流(例如套接字)。
var Fs = require('fs');
var Net = require('net');
var server = Net.createServer(function (socket) {
Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream())
.pipe(socket);
});
server.listen(8000);
正在测试
我可以通过向 example.txt
添加一些内容来测试它:
This is a line
This is another line
This is the last line
我可以启动我的服务器然后连接 telnet/nc:
$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
==== HEADER ====
This is a line
This is another line
This is the last line
==== FOOTER ====
Connection closed by foreign host.
那么双工流呢?
双工流是嵌入在一个流中的两个流。数据来自一个,而您将完全不同的数据写入另一个。它用于与另一个实体(例如 TCP 套接字)进行双向通信的地方。在这个例子中,我们不需要双工流,因为数据只在一个方向流动:
file -> MyProtocolStream -> socket
了解更多
正如 Meir 在另一个答案中指出的那样,Substack's stream handbook 是流的规范(也是最好的)资源以及官方文档。如果您通读它们并自己实现这些示例,您将了解有关流的所有知识。
通过单个套接字连续发送多个文件
如果您想将多个这些转换流的输出写入单个可写端,pipe()
不适合您。一旦 EOF 来自单个流,上游可写(套接字)也将被关闭。在这种情况下,也无法保证数据事件的顺序。因此,您需要通过监听 data
/end
事件来手动聚合流,开始读取一个流后另一个流完成:
var server = Net.createServer(function (socket) {
var incoming1 = Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream());
var incoming2 = Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream());
var readStream = function (stream, callback) {
stream.on('data', socket.write.bind(socket));
stream.on('end', callback);
};
readStream(incoming1, function () {
readStream(incoming2, function () {
socket.end();
});
});
});
如果你是嵌套的 callback-averse,你也可以使用 promises:
var server = Net.createServer(function (socket) {
var incoming1 = Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream());
var incoming2 = Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream());
var incoming3 = Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream());
var readStream = function (stream) {
return new Promise(function (resolve, reject) {
stream.on('data', socket.write.bind(socket));
stream.on('end', resolve);
});
};
readStream(incoming1)
.then(function () {
return readStream(incoming2);
})
.then(function () {
return readStream(incoming3);
})
.then(function () {
socket.end();
});
});
我正在尝试实现一个通过套接字发送和接收文件的协议。协议已指定,我无法更改它。
我是 NodeJs 的新手,这就是我尝试实现它的方式。
我将写入双工流,并将文件传输到其中。然后将其通过管道传输到套接字以发送数据。
困惑来自我应该在哪里阅读这个,以及在哪里写那个。如何知道读取文件已完成,以及如何告诉套接字已完成。文档对我来说不是很清楚,谷歌搜索增加了更多的混乱:)
如有任何帮助,我们将不胜感激。
P.S。回家后我会添加自己的样品,我现在没有。
编辑
在@MattHarrison 的回答后,我将代码更改为:
var stream = require('stream');
var util = require('util');
var bufferpack = require('bufferpack');
var fs = require('fs');
var net = require('net');
var MyProtocolStream = function () {
this.writtenHeader = false; // have we written the header yet?
stream.Transform.call(this);
};
util.inherits(MyProtocolStream, stream.Transform);
MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {
if (!this.writtenHeader) {
this.push('==== HEADER ====\n'); // if we've not, send the header first
}
// Can this function be interrupted at this very line?
// Then another _transform comes in and pushes its own data to socket
// Corrupted data maybe then?
// How to prevent this behavior? Buffering whole file before sending?
var self = this;
// I put a random timeout to simulate overlapped calls
// Can this happen in real world?
setTimeout(function () {
self.push(chunk); // send the incoming file chunks along as-is
callback();
}, Math.random()*10);
};
MyProtocolStream.prototype._flush = function (callback) {
this.push('==== FOOTER ====\n'); // Just before the stream closes, send footer
callback();
};
var file = '/tmp/a';
var server = net.createServer(function (sck) {
sck.addr = sck.remoteAddress;
console.log('Client connected - ' + sck.addr);
fs.createReadStream('/tmp/a').pipe(new MyProtocolStream()).pipe(sck);
fs.createReadStream('/tmp/b').pipe(new MyProtocolStream()).pipe(sck);
fs.createReadStream('/tmp/c').pipe(new MyProtocolStream()).pipe(sck);
sck.on('close', function () {
console.log('Client disconnected - ' + this.addr);
})
});
server.listen(22333, function () {
console.log('Server started on ' + 22333)
});
在 _transform
中查看我的评论。
这是一个great resource on streams。
至于套接字用法,当您 send/get 套接字消息时,您发送类型和有效负载。因此,您的通用套接字流可能是在您有数据要发送的同时发送类型为 'file_data' 的消息和内容,最后发送类型为 'eof' 的消息(用于文件结尾)和一个空有效负载。
我不确定您正在尝试实施的协议的具体细节,但以下应该为您提供了一个可以适应您需要的良好模式。
我虚构的协议
当客户端套接字连接到我的 TCP 服务器时,我想向他们发送一个文件。但首先我要发送一个header。在文件的末尾,在流结束之前,我还想发送一个header。所以写入套接字的数据看起来像:
==== HEADER ====
[FILE CONTENTS]
==== FOOTER ====
实现转换流
我只想转换 来自可读流的数据。注意 transform 是这里的关键字。我可以为此使用 Transform
流。
创建转换流时,您可以覆盖两个方法:_transform
和_flush
。 _transform
随着可读流的每个块的到来而被调用。您可以更改数据、缓冲数据或其他任何内容。 _flush
在可读的所有数据完成后立即调用。你可以在这里做更多的清理,或者写出最后一点数据。
var Stream = require('stream');
var Util = require('util');
var MyProtocolStream = function () {
this.writtenHeader = false; // have we written the header yet?
Stream.Transform.call(this);
};
Util.inherits(MyProtocolStream, Stream.Transform);
MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {
if (!this.writtenHeader) {
this.push('==== HEADER ====\n'); // if we've not, send the header first
this.writtenHeader = true;
}
this.push(chunk); // send the incoming file chunks along as-is
callback();
};
MyProtocolStream.prototype._flush = function (callback) {
this.push('==== FOOTER ====\n'); // Just before the stream closes, send footer
callback();
};
使用 MyProtocolStream
所以现在我有一个流可以执行我想要的操作,我可以简单地将一个文件(或任何可读流)通过我的自定义转换流传输到任何其他可写流(例如套接字)。
var Fs = require('fs');
var Net = require('net');
var server = Net.createServer(function (socket) {
Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream())
.pipe(socket);
});
server.listen(8000);
正在测试
我可以通过向 example.txt
添加一些内容来测试它:
This is a line
This is another line
This is the last line
我可以启动我的服务器然后连接 telnet/nc:
$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
==== HEADER ====
This is a line
This is another line
This is the last line
==== FOOTER ====
Connection closed by foreign host.
那么双工流呢?
双工流是嵌入在一个流中的两个流。数据来自一个,而您将完全不同的数据写入另一个。它用于与另一个实体(例如 TCP 套接字)进行双向通信的地方。在这个例子中,我们不需要双工流,因为数据只在一个方向流动:
file -> MyProtocolStream -> socket
了解更多
正如 Meir 在另一个答案中指出的那样,Substack's stream handbook 是流的规范(也是最好的)资源以及官方文档。如果您通读它们并自己实现这些示例,您将了解有关流的所有知识。
通过单个套接字连续发送多个文件
如果您想将多个这些转换流的输出写入单个可写端,pipe()
不适合您。一旦 EOF 来自单个流,上游可写(套接字)也将被关闭。在这种情况下,也无法保证数据事件的顺序。因此,您需要通过监听 data
/end
事件来手动聚合流,开始读取一个流后另一个流完成:
var server = Net.createServer(function (socket) {
var incoming1 = Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream());
var incoming2 = Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream());
var readStream = function (stream, callback) {
stream.on('data', socket.write.bind(socket));
stream.on('end', callback);
};
readStream(incoming1, function () {
readStream(incoming2, function () {
socket.end();
});
});
});
如果你是嵌套的 callback-averse,你也可以使用 promises:
var server = Net.createServer(function (socket) {
var incoming1 = Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream());
var incoming2 = Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream());
var incoming3 = Fs.createReadStream('./example.txt')
.pipe(new MyProtocolStream());
var readStream = function (stream) {
return new Promise(function (resolve, reject) {
stream.on('data', socket.write.bind(socket));
stream.on('end', resolve);
});
};
readStream(incoming1)
.then(function () {
return readStream(incoming2);
})
.then(function () {
return readStream(incoming3);
})
.then(function () {
socket.end();
});
});