index/ingest 非常大的 json 文件到数据库 node.js
index/ingest very large json file to database with node.js
我继承了一个巨大的 json 文件,我试图将其索引到 elasticsearch(不是真正的数据库,但不要挂在 es 上,它应该适用于大多数数据库摄取)。我正在使用节点进行摄取。我已经尝试过流和异步,但我很困惑我没有解决这个问题的框架 - 没有内存溢出等。
我不能 post 一对一,但它实际上是一个多维对象,看起来像:
[
{
document: {
type: 1,
type2: 2,
type3: {...}
},
{...}
]
我只需要摄取文档,我可以使用 elasticsearch 客户端并批量处理它们。我需要放慢流、解析和分块的速度。
完全卡住了...帮助 Whosebug 今天是星期五我想回家; ).
根据 migg 对 json-parse-stream 的建议——我尝试的第三个 json 流库——我终于有了一个工作摄取。事实上,在我写这篇文章时它是 运行。希望有人会觉得这很有用。
const fs = require('graceful-fs');
const parse = require('json-parse-stream');
const es = require('event-stream');
const client = new require('elasticsearch').Client();
var WritableBulk = require('elasticsearch-streams').WritableBulk;
var TransformToBulk = require('elasticsearch-streams').TransformToBulk;
var rs = fs.createReadStream('./resources/mydoc.json');
var bulkExec = function (body, callback) {
console.log(body);
client.bulk({
index: 'my_index',
type: 'my_type',
body: body
}, callback);
};
var toBulk = new TransformToBulk(() => { return { _index: 'my_index', _type: 'my_type' }; });
const done = (err, res) => {
if (err) {
console.log(err);
}
console.log(res);
console.log('go get a drink you deserve it');
};
var ws = new WritableBulk(bulkExec);
rs.pipe(parse())
.pipe(es.mapSync(function (element) {
var a = [];
if (element.key === 'document') {
a = element.value;
return a;
}
}))
.pipe(toBulk)
.pipe(ws).on('finish', done);
我继承了一个巨大的 json 文件,我试图将其索引到 elasticsearch(不是真正的数据库,但不要挂在 es 上,它应该适用于大多数数据库摄取)。我正在使用节点进行摄取。我已经尝试过流和异步,但我很困惑我没有解决这个问题的框架 - 没有内存溢出等。
我不能 post 一对一,但它实际上是一个多维对象,看起来像:
[
{
document: {
type: 1,
type2: 2,
type3: {...}
},
{...}
]
我只需要摄取文档,我可以使用 elasticsearch 客户端并批量处理它们。我需要放慢流、解析和分块的速度。
完全卡住了...帮助 Whosebug 今天是星期五我想回家; ).
根据 migg 对 json-parse-stream 的建议——我尝试的第三个 json 流库——我终于有了一个工作摄取。事实上,在我写这篇文章时它是 运行。希望有人会觉得这很有用。
const fs = require('graceful-fs');
const parse = require('json-parse-stream');
const es = require('event-stream');
const client = new require('elasticsearch').Client();
var WritableBulk = require('elasticsearch-streams').WritableBulk;
var TransformToBulk = require('elasticsearch-streams').TransformToBulk;
var rs = fs.createReadStream('./resources/mydoc.json');
var bulkExec = function (body, callback) {
console.log(body);
client.bulk({
index: 'my_index',
type: 'my_type',
body: body
}, callback);
};
var toBulk = new TransformToBulk(() => { return { _index: 'my_index', _type: 'my_type' }; });
const done = (err, res) => {
if (err) {
console.log(err);
}
console.log(res);
console.log('go get a drink you deserve it');
};
var ws = new WritableBulk(bulkExec);
rs.pipe(parse())
.pipe(es.mapSync(function (element) {
var a = [];
if (element.key === 'document') {
a = element.value;
return a;
}
}))
.pipe(toBulk)
.pipe(ws).on('finish', done);