在写入文件之前对数据流进行排序
Sorting a data stream before writing to file in nodejs
我有一个可能包含多达 1M 条记录的输入文件,每条记录如下所示
field 1 field 2 field3 \n
我想读取此输入文件并根据 field3
对其进行排序,然后再将其写入另一个文件。
这是我目前所拥有的
var fs = require('fs'),
readline = require('readline'),
stream = require('stream');
var start = Date.now();
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;
var rl = readline.createInterface({
input: fs.createReadStream('cross.txt'),
output: outstream,
terminal: false
});
rl.on('line', function(line) {
//var tmp = line.split("\t").reverse().join('\t') + '\n';
//fs.appendFileSync("op_rev.txt", tmp );
// this logic to reverse and then sort is too slow
});
rl.on('close', function() {
var closetime = Date.now();
console.log('Read entirefile. ', (closetime - start)/1000, ' secs');
});
我基本上卡在这一点上了,我只能从一个文件读取并写入另一个文件,有没有办法在写入之前有效地排序这些数据
我遇到了非常相似的问题,需要执行 external sort。
我想通了,在浪费了一些时间之后,我可以将数据加载到数据库中,然后从中查询出所需的数据。
即使插入没有排序也没关系,只要我的查询结果可以。
希望它也对你有用。
为了将数据插入数据库,节点上有很多工具可以执行此类任务。我有 this pet project 做类似的工作。
我也相信,如果您 search 这个主题,您会找到更多信息。
祝你好运。
您有两种选择,具体取决于要处理的数据量。 (3 列的 1M 记录数并不能说明实际数据量)
加载内存中的数据,就地排序
var lines = [];
rl.on('line', function(line) {
lines.push(line.split("\t").reverse());
});
rl.on('close', function() {
lines.sort(function(a, b) { return compare(a[0], b[0]); });
// write however you want
fs.writeFileSync(
fileName,
lines.map(function(x) { return x.join("\t"); }).join("\n")
);
function compare(a, b) {
if (a < b) return -1;
if (a > b) return 1;
return 0;
}
});
在持久数据库中加载数据,按顺序读取
使用您选择的数据库引擎(例如 nedb,用于 nodejs 的纯 javascript 数据库)
EDIT: NeDB 似乎将整个数据库保存在内存中,文件只是数据的持久副本。我们将不得不寻找另一种实现方式。 TingoDB 看起来很有希望。
// This code is only to give an idea, not tested in any way
var Datastore = require('nedb');
var db = new Datastore({
filename: 'path/to/temp/datafile',
autoload: true
});
rl.on('line', function(line) {
var tmp = line.split("\t").reverse();
db.insert({
field0: tmp[0],
field1: tmp[1],
field2: tmp[2]
});
});
rl.on('close', function() {
var cursor = db.find({})
.sort({ field0: 1 }); // sort by field0, ascending
var PAGE_SIZE = 1000;
paginate(0);
function paginate(i) {
cursor.skip(i).take(PAGE_SIZE).exec(function(err, docs) {
// handle errors
var tmp = docs.map(function(o) {
return o.field0 + "\t" + o.field1 + "\t" + o.field2 + "\n";
});
fs.appendFileSync("op_rev.txt", tmp.join(""));
if (docs.length >= PAGE_SIZE) {
paginate(i + PAGE_SIZE);
} else {
// cleanup temp database
}
});
}
});
你可以利用流来做这样的事情。有一些 NPM 模块会有所帮助——首先通过 运行
包含它们
npm install sort-stream csv-parse stream-transform
从命令行。
然后:
var fs = require('fs');
var sort = require('sort-stream');
var parse = require('csv-parse');
var transform = require('stream-transform');
// Create a readble stream from the input file.
fs.createReadStream('./cross.txt')
// Use `csv-parse` to parse the input using a tab character (\t) as the
// delimiter. This produces a record for each row which is an array of
// field values.
.pipe(parse({
delimiter: '\t'
}))
// Use `sort-stream` to sort the parsed records on the third field.
.pipe(sort(function (a, b) {
return a[2].localeCompare(b[2]);
}))
// Use `stream-transform` to transform each record (an array of fields) into
// a single tab-delimited string to be output to our destination text file.
.pipe(transform(function(row) {
return row.join('\t') + '\r';
}))
// And finally, output those strings to our destination file.
.pipe(fs.createWriteStream('./cross_sorted.txt'));
DB
和 sort-stream
是很好的解决方案, 但是 DB 可能有点矫枉过正,我认为 sort-stream
最终只是对整个文件进行排序在内存数组中(在 through
结束回调上),所以我认为与原始解决方案相比,性能将大致相同。
(但我没有 运行 任何基准,所以我可能是错的)。
所以,为了简单起见,我将提供另一个解决方案:)
编辑:
我很好奇这会有多大差异,所以我 运行 一些基准。
结果甚至让我感到惊讶,事实证明 sort -k3,3
解决方案到目前为止更好, 比原始解决方案快 10 倍(简单的数组排序),而nedb
和 sort-stream
解决方案比原始解决方案至少慢 x18 倍(即至少比 sort -k3,3
慢 x180 倍)。
(参见下面的基准测试结果)
如果在 *nix 机器上 (Unix, Linux, Mac, ...) 你可以简单地使用
sort -k 3,3 yourInputFile > op_rev.txt
让 OS 为您排序。
您可能会获得更好的性能,因为排序是本机完成的。
或者,如果您想在 Node 中处理排序后的输出:
var util = require('util'),
spawn = require('child_process').spawn,
sort = spawn('sort', ['-k3,3', './test.tsv']);
sort.stdout.on('data', function (data) {
// process data
data.toString()
.split('\n')
.map(line => line.split("\t"))
.forEach(record => console.info(`Record: ${record}`));
});
sort.on('exit', function (code) {
if (code) {
// handle error
}
console.log('Done');
});
// optional
sort.stderr.on('data', function (data) {
// handle error...
console.log('stderr: ' + data);
});
希望这对您有所帮助:)
编辑: 添加一些基准细节。
我很好奇这会有多大的不同,所以我 运行 一些基准。
这是结果(运行 在 MacBook Pro 上):
sort1 使用一种直接的方法,将记录排序为 in-memory array
.
平均时间:35.6s(基线)
sort2 按照 Joe Krill 的建议使用 sort-stream
。
平均时间:11.1m(大约慢 x18.7 倍)
(不知道为什么,我没有深究。)
根据 Tamas Hegedus 的建议,sort3 使用 nedb
。
时间:大约16m(大约慢x27倍)
sort4 仅通过在终端
中执行sort -k 3,3 input.txt > out4.txt
进行排序
平均时间:1.2s(大约 x30 倍)
sort5 使用 sort -k3,3
,并处理发送到标准输出的响应
平均时间:3.65s(大约 x9.7 倍)
我有一个可能包含多达 1M 条记录的输入文件,每条记录如下所示
field 1 field 2 field3 \n
我想读取此输入文件并根据 field3
对其进行排序,然后再将其写入另一个文件。
这是我目前所拥有的
var fs = require('fs'),
readline = require('readline'),
stream = require('stream');
var start = Date.now();
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;
var rl = readline.createInterface({
input: fs.createReadStream('cross.txt'),
output: outstream,
terminal: false
});
rl.on('line', function(line) {
//var tmp = line.split("\t").reverse().join('\t') + '\n';
//fs.appendFileSync("op_rev.txt", tmp );
// this logic to reverse and then sort is too slow
});
rl.on('close', function() {
var closetime = Date.now();
console.log('Read entirefile. ', (closetime - start)/1000, ' secs');
});
我基本上卡在这一点上了,我只能从一个文件读取并写入另一个文件,有没有办法在写入之前有效地排序这些数据
我遇到了非常相似的问题,需要执行 external sort。
我想通了,在浪费了一些时间之后,我可以将数据加载到数据库中,然后从中查询出所需的数据。
即使插入没有排序也没关系,只要我的查询结果可以。
希望它也对你有用。
为了将数据插入数据库,节点上有很多工具可以执行此类任务。我有 this pet project 做类似的工作。
我也相信,如果您 search 这个主题,您会找到更多信息。
祝你好运。
您有两种选择,具体取决于要处理的数据量。 (3 列的 1M 记录数并不能说明实际数据量)
加载内存中的数据,就地排序
var lines = [];
rl.on('line', function(line) {
lines.push(line.split("\t").reverse());
});
rl.on('close', function() {
lines.sort(function(a, b) { return compare(a[0], b[0]); });
// write however you want
fs.writeFileSync(
fileName,
lines.map(function(x) { return x.join("\t"); }).join("\n")
);
function compare(a, b) {
if (a < b) return -1;
if (a > b) return 1;
return 0;
}
});
在持久数据库中加载数据,按顺序读取
使用您选择的数据库引擎(例如 nedb,用于 nodejs 的纯 javascript 数据库)
EDIT: NeDB 似乎将整个数据库保存在内存中,文件只是数据的持久副本。我们将不得不寻找另一种实现方式。 TingoDB 看起来很有希望。
// This code is only to give an idea, not tested in any way
var Datastore = require('nedb');
var db = new Datastore({
filename: 'path/to/temp/datafile',
autoload: true
});
rl.on('line', function(line) {
var tmp = line.split("\t").reverse();
db.insert({
field0: tmp[0],
field1: tmp[1],
field2: tmp[2]
});
});
rl.on('close', function() {
var cursor = db.find({})
.sort({ field0: 1 }); // sort by field0, ascending
var PAGE_SIZE = 1000;
paginate(0);
function paginate(i) {
cursor.skip(i).take(PAGE_SIZE).exec(function(err, docs) {
// handle errors
var tmp = docs.map(function(o) {
return o.field0 + "\t" + o.field1 + "\t" + o.field2 + "\n";
});
fs.appendFileSync("op_rev.txt", tmp.join(""));
if (docs.length >= PAGE_SIZE) {
paginate(i + PAGE_SIZE);
} else {
// cleanup temp database
}
});
}
});
你可以利用流来做这样的事情。有一些 NPM 模块会有所帮助——首先通过 运行
包含它们npm install sort-stream csv-parse stream-transform
从命令行。
然后:
var fs = require('fs');
var sort = require('sort-stream');
var parse = require('csv-parse');
var transform = require('stream-transform');
// Create a readble stream from the input file.
fs.createReadStream('./cross.txt')
// Use `csv-parse` to parse the input using a tab character (\t) as the
// delimiter. This produces a record for each row which is an array of
// field values.
.pipe(parse({
delimiter: '\t'
}))
// Use `sort-stream` to sort the parsed records on the third field.
.pipe(sort(function (a, b) {
return a[2].localeCompare(b[2]);
}))
// Use `stream-transform` to transform each record (an array of fields) into
// a single tab-delimited string to be output to our destination text file.
.pipe(transform(function(row) {
return row.join('\t') + '\r';
}))
// And finally, output those strings to our destination file.
.pipe(fs.createWriteStream('./cross_sorted.txt'));
DB
和 sort-stream
是很好的解决方案, 但是 DB 可能有点矫枉过正,我认为 sort-stream
最终只是对整个文件进行排序在内存数组中(在 through
结束回调上),所以我认为与原始解决方案相比,性能将大致相同。
(但我没有 运行 任何基准,所以我可能是错的)。
所以,为了简单起见,我将提供另一个解决方案:)
编辑: 我很好奇这会有多大差异,所以我 运行 一些基准。
结果甚至让我感到惊讶,事实证明 sort -k3,3
解决方案到目前为止更好, 比原始解决方案快 10 倍(简单的数组排序),而nedb
和 sort-stream
解决方案比原始解决方案至少慢 x18 倍(即至少比 sort -k3,3
慢 x180 倍)。
(参见下面的基准测试结果)
如果在 *nix 机器上 (Unix, Linux, Mac, ...) 你可以简单地使用
sort -k 3,3 yourInputFile > op_rev.txt
让 OS 为您排序。
您可能会获得更好的性能,因为排序是本机完成的。
或者,如果您想在 Node 中处理排序后的输出:
var util = require('util'),
spawn = require('child_process').spawn,
sort = spawn('sort', ['-k3,3', './test.tsv']);
sort.stdout.on('data', function (data) {
// process data
data.toString()
.split('\n')
.map(line => line.split("\t"))
.forEach(record => console.info(`Record: ${record}`));
});
sort.on('exit', function (code) {
if (code) {
// handle error
}
console.log('Done');
});
// optional
sort.stderr.on('data', function (data) {
// handle error...
console.log('stderr: ' + data);
});
希望这对您有所帮助:)
编辑: 添加一些基准细节。
我很好奇这会有多大的不同,所以我 运行 一些基准。
这是结果(运行 在 MacBook Pro 上):
sort1 使用一种直接的方法,将记录排序为
in-memory array
.
平均时间:35.6s(基线)sort2 按照 Joe Krill 的建议使用
sort-stream
。
平均时间:11.1m(大约慢 x18.7 倍)
(不知道为什么,我没有深究。)
根据 Tamas Hegedus 的建议,sort3 使用
nedb
。
时间:大约16m(大约慢x27倍)sort4 仅通过在终端
中执行sort -k 3,3 input.txt > out4.txt
进行排序 平均时间:1.2s(大约 x30 倍)sort5 使用
sort -k3,3
,并处理发送到标准输出的响应
平均时间:3.65s(大约 x9.7 倍)