如何在读取回调处理程序中使用从节点流读取的数据添加其他数据?
How to add additional data with data read from a node stream in read callback handler?
我正在创建一个可读流数组(来自包含 JSON 文档的文件)并试图将它们通过管道传输到另一个流。
文件中的数据正在通过……但对于我在管道流中收到的每个对象,我想知道该数据来自哪个文件:
var fs = require('fs');
var path = require('path');
var JSONStream = require('JSONStream');
var tmp1 = path.join(__dirname, 'data', 'tmp1.json');
var tmp2 = path.join(__dirname, 'data', 'tmp2.json');
var jsonStream = JSONStream.parse();
jsonStream.on('data', function (data) {
console.log('---\nFrom which file does this data come from?');
console.log(data);
});
[tmp1, tmp2].map(p => {
return fs.createReadStream(p);
}).forEach(stream => stream.pipe(jsonStream));
输出:
---
From which file does this data come from?
{ a: 1, b: 2 }
---
From which file does this data come from?
{ a: 3, b: 4 }
---
From which file does this data come from?
{ a: 5, b: 6 }
---
From which file does this data come from?
{ a: 100, b: 200 }
---
From which file does this data come from?
{ a: 300, b: 400 }
---
From which file does this data come from?
{ a: 500, b: 600 }
需要文件路径来进一步处理读取的对象(在 jsonStream.on('data') callback
中),但我不知道如何传递这些额外数据。
下面是一个可能的解决方案(我将其标记为答案,除非我得到更好的答案):
var fs = require('fs');
var path = require('path');
var JSONStream = require('JSONStream');
var through = require('through2');
var tmp1 = path.join(__dirname, 'data', 'tmp1.json');
var tmp2 = path.join(__dirname, 'data', 'tmp2.json');
[tmp1, tmp2]
.map(p => fs.createReadStream(p))
.map(stream => [path.parse(stream.path), stream.pipe(JSONStream.parse())])
.map(([parsedPath, jsonStream]) => {
return jsonStream.pipe(through.obj(function (obj, _, cb) {
this.push({
fileName: parsedPath.name,
data: obj
});
cb();
}));
})
.map(stream => {
stream.on('data', function (data) {
console.log(JSON.stringify(data, null, 2));
});
})
;
输出:
{
"fileName": "tmp1",
"data": {
"a": 1,
"b": 2
}
}
{
"fileName": "tmp1",
"data": {
"a": 3,
"b": 4
}
}
{
"fileName": "tmp1",
"data": {
"a": 5,
"b": 6
}
}
{
"fileName": "tmp2",
"data": {
"a": 100,
"b": 200
}
}
{
"fileName": "tmp2",
"data": {
"a": 300,
"b": 400
}
}
{
"fileName": "tmp2",
"data": {
"a": 500,
"b": 600
}
}
我正在创建一个可读流数组(来自包含 JSON 文档的文件)并试图将它们通过管道传输到另一个流。
文件中的数据正在通过……但对于我在管道流中收到的每个对象,我想知道该数据来自哪个文件:
var fs = require('fs');
var path = require('path');
var JSONStream = require('JSONStream');
var tmp1 = path.join(__dirname, 'data', 'tmp1.json');
var tmp2 = path.join(__dirname, 'data', 'tmp2.json');
var jsonStream = JSONStream.parse();
jsonStream.on('data', function (data) {
console.log('---\nFrom which file does this data come from?');
console.log(data);
});
[tmp1, tmp2].map(p => {
return fs.createReadStream(p);
}).forEach(stream => stream.pipe(jsonStream));
输出:
---
From which file does this data come from?
{ a: 1, b: 2 }
---
From which file does this data come from?
{ a: 3, b: 4 }
---
From which file does this data come from?
{ a: 5, b: 6 }
---
From which file does this data come from?
{ a: 100, b: 200 }
---
From which file does this data come from?
{ a: 300, b: 400 }
---
From which file does this data come from?
{ a: 500, b: 600 }
需要文件路径来进一步处理读取的对象(在 jsonStream.on('data') callback
中),但我不知道如何传递这些额外数据。
下面是一个可能的解决方案(我将其标记为答案,除非我得到更好的答案):
var fs = require('fs');
var path = require('path');
var JSONStream = require('JSONStream');
var through = require('through2');
var tmp1 = path.join(__dirname, 'data', 'tmp1.json');
var tmp2 = path.join(__dirname, 'data', 'tmp2.json');
[tmp1, tmp2]
.map(p => fs.createReadStream(p))
.map(stream => [path.parse(stream.path), stream.pipe(JSONStream.parse())])
.map(([parsedPath, jsonStream]) => {
return jsonStream.pipe(through.obj(function (obj, _, cb) {
this.push({
fileName: parsedPath.name,
data: obj
});
cb();
}));
})
.map(stream => {
stream.on('data', function (data) {
console.log(JSON.stringify(data, null, 2));
});
})
;
输出:
{
"fileName": "tmp1",
"data": {
"a": 1,
"b": 2
}
}
{
"fileName": "tmp1",
"data": {
"a": 3,
"b": 4
}
}
{
"fileName": "tmp1",
"data": {
"a": 5,
"b": 6
}
}
{
"fileName": "tmp2",
"data": {
"a": 100,
"b": 200
}
}
{
"fileName": "tmp2",
"data": {
"a": 300,
"b": 400
}
}
{
"fileName": "tmp2",
"data": {
"a": 500,
"b": 600
}
}