在写入文件之前对数据流进行排序

Sorting a data stream before writing to file in nodejs

我有一个可能包含多达 1M 条记录的输入文件,每条记录如下所示

field 1 field 2 field3 \n

我想读取此输入文件并根据 field3 对其进行排序,然后再将其写入另一个文件。

这是我目前所拥有的

var fs = require('fs'),
    readline = require('readline'),
    stream = require('stream');

var start = Date.now();

var outstream = new stream;
outstream.readable = true;
outstream.writable = true;

var rl = readline.createInterface({
    input: fs.createReadStream('cross.txt'),
    output: outstream,
    terminal: false
});

rl.on('line', function(line) {
    //var tmp = line.split("\t").reverse().join('\t') + '\n';
    //fs.appendFileSync("op_rev.txt", tmp );
    // this logic to reverse and then sort is too slow
});

rl.on('close', function() {
    var closetime = Date.now();
    console.log('Read entirefile. ', (closetime - start)/1000, ' secs');
});

我基本上卡在这一点上了,我只能从一个文件读取并写入另一个文件,有没有办法在写入之前有效地排序这些数据

我遇到了非常相似的问题,需要执行 external sort

我想通了,在浪费了一些时间之后,我可以将数据加载到数据库中,然后从中查询出所需的数据。

即使插入没有排序也没关系,只要我的查询结果可以。

希望它也对你有用。

为了将数据插入数据库,节点上有很多工具可以执行此类任务。我有 this pet project 做类似的工作。

我也相信,如果您 search 这个主题,您会找到更多信息。

祝你好运。

您有两种选择,具体取决于要处理的数据量。 (3 列的 1M 记录数并不能说明实际数据量)

加载内存中的数据,就地排序

var lines = [];
rl.on('line', function(line) {
    lines.push(line.split("\t").reverse());
});

rl.on('close', function() {
    lines.sort(function(a, b) { return compare(a[0], b[0]); });

    // write however you want
    fs.writeFileSync(
        fileName,
        lines.map(function(x) { return x.join("\t"); }).join("\n")
    );
    function compare(a, b) {
        if (a < b) return -1;
        if (a > b) return 1;
        return 0;
    }
});

在持久数据库中加载数据,按顺序读取

使用您选择的数据库引擎(例如 nedb,用于 nodejs 的纯 javascript 数据库)

EDIT: NeDB 似乎将整个数据库保存在内存中,文件只是数据的持久副本。我们将不得不寻找另一种实现方式。 TingoDB 看起来很有希望。

// This code is only to give an idea, not tested in any way

var Datastore = require('nedb');
var db = new Datastore({
    filename: 'path/to/temp/datafile',
    autoload: true
});

rl.on('line', function(line) {
    var tmp = line.split("\t").reverse();
    db.insert({
        field0: tmp[0],
        field1: tmp[1],
        field2: tmp[2]
    });
});

rl.on('close', function() {
    var cursor = db.find({})
            .sort({ field0: 1 }); // sort by field0, ascending
    var PAGE_SIZE = 1000;
    paginate(0);
    function paginate(i) {
        cursor.skip(i).take(PAGE_SIZE).exec(function(err, docs) {
            // handle errors

            var tmp = docs.map(function(o) {
                return o.field0 + "\t" + o.field1 + "\t" + o.field2 + "\n";
            });
            fs.appendFileSync("op_rev.txt", tmp.join(""));
            if (docs.length >= PAGE_SIZE) {
                paginate(i + PAGE_SIZE);
            } else {
                // cleanup temp database
            }
        });
    }
});

你可以利用流来做这样的事情。有一些 NPM 模块会有所帮助——首先通过 运行

包含它们
npm install sort-stream csv-parse stream-transform

从命令行。

然后:

var fs = require('fs');
var sort = require('sort-stream');
var parse = require('csv-parse');
var transform = require('stream-transform');

// Create a readble stream from the input file.
fs.createReadStream('./cross.txt')
  // Use `csv-parse` to parse the input using a tab character (\t) as the 
  // delimiter. This produces a record for each row which is an array of 
  // field values.
  .pipe(parse({
    delimiter: '\t'
  }))
  // Use `sort-stream` to sort the parsed records on the third field. 
  .pipe(sort(function (a, b) {
    return a[2].localeCompare(b[2]);
  }))
  // Use `stream-transform` to transform each record (an array of fields) into 
  // a single tab-delimited string to be output to our destination text file.
  .pipe(transform(function(row) {
    return row.join('\t') + '\r';
  }))
  // And finally, output those strings to our destination file.
  .pipe(fs.createWriteStream('./cross_sorted.txt'));

DBsort-stream 是很好的解决方案, 但是 DB 可能有点矫枉过正,我认为 sort-stream 最终只是对整个文件进行排序在内存数组中(在 through 结束回调上),所以我认为与原始解决方案相比,性能将大致相同。
(但我没有 运行 任何基准,所以我可能是错的)。

所以,为了简单起见,我将提供另一个解决方案:)


编辑: 我很好奇这会有多大差异,所以我 运行 一些基准。

结果甚至让我感到惊讶,事实证明 sort -k3,3 解决方案到目前为止更好, 比原始解决方案快 10 倍(简单的数组排序),而nedbsort-stream 解决方案比原始解决方案至少慢 x18 倍(即至少比 sort -k3,3 慢 x180 倍)。

(参见下面的基准测试结果)


如果在 *nix 机器上 (Unix, Linux, Mac, ...) 你可以简单地使用
sort -k 3,3 yourInputFile > op_rev.txt 让 OS 为您排序。
您可能会获得更好的性能,因为排序是本机完成的。

或者,如果您想在 Node 中处理排序后的输出:

var util = require('util'),
    spawn = require('child_process').spawn,
    sort = spawn('sort', ['-k3,3', './test.tsv']);

sort.stdout.on('data', function (data) {
    // process data
    data.toString()
        .split('\n')
        .map(line => line.split("\t"))
        .forEach(record => console.info(`Record: ${record}`));
});

sort.on('exit', function (code) {
    if (code) {
        // handle error
    }

    console.log('Done');
});

// optional
sort.stderr.on('data', function (data) {
    // handle error...
    console.log('stderr: ' + data);
});

希望这对您有所帮助:)


编辑: 添加一些基准细节。

我很好奇这会有多大的不同,所以我 运行 一些基准。

这是结果(运行 在 MacBook Pro 上):

  • sort1 使用一种直接的方法,将记录排序为 in-memory array.
    平均时间:35.6s(基线)

  • sort2 按照 Joe Krill 的建议使用 sort-stream
    平均时间:11.1m(大约慢 x18.7 倍
    (不知道为什么,我没有深究。)

  • 根据 Tamas Hegedus 的建议,
  • sort3 使用 nedb
    时间:大约16m(大约慢x27倍

  • sort4 仅通过在终端
    中执行sort -k 3,3 input.txt > out4.txt 进行排序 平均时间:1.2s(大约 x30 倍

  • sort5 使用 sort -k3,3,并处理发送到标准输出的响应
    平均时间:3.65s(大约 x9.7 倍