如何在读取回调处理程序中使用从节点流读取的数据添加其他数据?

How to add additional data with data read from a node stream in read callback handler?

我正在创建一个可读流数组(来自包含 JSON 文档的文件)并试图将它们通过管道传输到另一个流。

文件中的数据正在通过……但对于我在管道流中收到的每个对象,我想知道该数据来自哪个文件:

var fs = require('fs');
var path = require('path');
var JSONStream = require('JSONStream');

var tmp1 = path.join(__dirname, 'data', 'tmp1.json');
var tmp2 = path.join(__dirname, 'data', 'tmp2.json');

var jsonStream = JSONStream.parse();
jsonStream.on('data', function (data) {
  console.log('---\nFrom which file does this data come from?');
  console.log(data);
});

[tmp1, tmp2].map(p => {
  return fs.createReadStream(p);
}).forEach(stream => stream.pipe(jsonStream));

输出:

---
From which file does this data come from?
{ a: 1, b: 2 }
---
From which file does this data come from?
{ a: 3, b: 4 }
---
From which file does this data come from?
{ a: 5, b: 6 }
---
From which file does this data come from?
{ a: 100, b: 200 }
---
From which file does this data come from?
{ a: 300, b: 400 }
---
From which file does this data come from?
{ a: 500, b: 600 }

需要文件路径来进一步处理读取的对象(在 jsonStream.on('data') callback 中),但我不知道如何传递这些额外数据。

下面是一个可能的解决方案(我将其标记为答案,除非我得到更好的答案):

var fs = require('fs');
var path = require('path');
var JSONStream = require('JSONStream');
var through = require('through2');

var tmp1 = path.join(__dirname, 'data', 'tmp1.json');
var tmp2 = path.join(__dirname, 'data', 'tmp2.json');

[tmp1, tmp2]
  .map(p => fs.createReadStream(p))
  .map(stream => [path.parse(stream.path), stream.pipe(JSONStream.parse())])
  .map(([parsedPath, jsonStream]) => {
    return jsonStream.pipe(through.obj(function (obj, _, cb) {
      this.push({
        fileName: parsedPath.name,
        data: obj
      });
      cb();
    }));
  })
  .map(stream => {
    stream.on('data', function (data) {
      console.log(JSON.stringify(data, null, 2));
    });
  })
  ;

输出:

{
  "fileName": "tmp1",
  "data": {
    "a": 1,
    "b": 2
  }
}
{
  "fileName": "tmp1",
  "data": {
    "a": 3,
    "b": 4
  }
}
{
  "fileName": "tmp1",
  "data": {
    "a": 5,
    "b": 6
  }
}
{
  "fileName": "tmp2",
  "data": {
    "a": 100,
    "b": 200
  }
}
{
  "fileName": "tmp2",
  "data": {
    "a": 300,
    "b": 400
  }
}
{
  "fileName": "tmp2",
  "data": {
    "a": 500,
    "b": 600
  }
}