index/ingest 非常大的 json 文件到数据库 node.js

index/ingest very large json file to database with node.js

我继承了一个巨大的 json 文件,我试图将其索引到 elasticsearch(不是真正的数据库,但不要挂在 es 上,它应该适用于大多数数据库摄取)。我正在使用节点进行摄取。我已经尝试过流和异步,但我很困惑我没有解决这个问题的框架 - 没有内存溢出等。

我不能 post 一对一,但它实际上是一个多维对象,看起来像:

[ 
   { 
     document: {
        type: 1,
        type2: 2,
        type3: {...}
    },
    {...}
]

我只需要摄取文档,我可以使用 elasticsearch 客户端并批量处理它们。我需要放慢流、解析和分块的速度。

完全卡住了...帮助 Whosebug 今天是星期五我想回家; ).

根据 migg 对 json-parse-stream 的建议——我尝试的第三个 json 流库——我终于有了一个工作摄取。事实上,在我写这篇文章时它是 运行。希望有人会觉得这很有用。

const fs = require('graceful-fs');
const parse = require('json-parse-stream');
const es = require('event-stream');
const client = new require('elasticsearch').Client();
var WritableBulk = require('elasticsearch-streams').WritableBulk;
var TransformToBulk = require('elasticsearch-streams').TransformToBulk;


var rs = fs.createReadStream('./resources/mydoc.json');

var bulkExec = function (body, callback) {
  console.log(body);
  client.bulk({
    index: 'my_index',
    type: 'my_type',
    body: body
  }, callback);
};

var toBulk = new TransformToBulk(() => { return { _index: 'my_index', _type: 'my_type' }; });


const done = (err, res) =>  {
  if (err) {
    console.log(err);
  }
  console.log(res);
  console.log('go get a drink you deserve it');
};

var ws = new WritableBulk(bulkExec);

rs.pipe(parse())
.pipe(es.mapSync(function (element) {
  var a =  [];
  if (element.key === 'document') {
    a = element.value;
    return a;
  }
}))
.pipe(toBulk)
.pipe(ws).on('finish', done);