从多个集合中聚合

Aggregate out from Multiple Collections

如何在不替换另一个聚合输出的集合的情况下将 MongoDB 聚合的结果输出到集合中?

我只需要使用 $out: 'tempCollection' 获取数据,因为我有 5 亿个文档,并且获取 pipeline stage limit

var q = [
  {$match: query},
  {$group: {_id: '$hash'}},
  {$out: 'tempCollection'}
];

async.parallel([
  function(callback) {
    firstCollection.aggregate(q, callback);
  },
  function(callback) {
    secondCollection.aggregate(q, callback);
  },

  ...

], function() {

  // I want to get all from tempCollection (with pagination) here

});

这里的底线是 $out 选项只会 "replaces" 在目标集合上输出。因此,要执行任何其他操作,您必须通过客户端连接工作,而不仅仅是输出到服务器。

使用猫鼬的最佳选择是直接进入底层驱动程序并访问驱动程序支持的 node stream interface

简单的例子,但它展示了构造的基本方法:

var async = require('async'),
    mongoose = require('mongoose'),
    Schema = mongoose.Schema;

mongoose.connect('mongodb://localhost/aggtest');

var testSchema = new Schema({},{ "_id": false, strict: false });


var ModelA = mongoose.model( 'ModelA', testSchema ),
    ModelB = mongoose.model( 'ModelB', testSchema ),
    ModelC = mongoose.model( 'ModelC', testSchema );

function processCursor(cursor,target,callback) {

  cursor.on("end",callback);
  cursor.on("error",callback);

  cursor.on("data",function(data) {
    cursor.pause();
    target.update(
      { "_id": data._id },
      { "$setOnInsert": { "_id": data._id } },
      { "upsert": true },
      function(err) {
        if (err) callback(err);
        cursor.resume();
      }
    );
  });
}

async.series(
  [
    // Clean data
    function(callback) {
      async.each([ModelA,ModelB,ModelC],function(model,callback) {
        model.remove({},callback);
      },callback);
    },

    // Sample data
    function(callback) {
      async.each([ModelA,ModelB],function(model,callback) {
        async.each([1,2,3],function(id,callback) {
          model.create({ "_id": id },callback);
        },callback);
      },callback);
    },

    // Run merge
    function(callback) {
      async.parallel(
        [
          function(callback) {
            var cursor = ModelA.collection.aggregate(
              [
                { "$group": { "_id": "$_id" } }
              ],
              { "batchSize": 25 }
            );

            processCursor(cursor,ModelC,callback)
          },
          function(callback) {

            var cursor = ModelB.collection.aggregate(
              [
                { "$group": { "_id": "$_id" } }
              ],
              { "batchSize": 25 }
            );

            processCursor(cursor,ModelC,callback)
          }
        ],
        callback
      );
    },

    // Get merged
    function(callback) {
      ModelC.find({},function(err,results) {
        console.log(results);
        callback(err);
      });
    }
  ],
  function(err) {
    if (err) throw err;
    mongoose.disconnect();
  }
);

除此之外,您将需要 $out 到 "separate" 集合,然后将它们与类似的 .update() 过程合并,但要保留它 "server side" 那么你需要使用 .eval()

这不太好,但这是在服务器上保持操作的唯一方法。您还可以使用 "Bulk" 操作(再次通过相同的本机 .collection 接口)修改它以获得更多吞吐量。但选项归结为 "read through the client" 或 "eval"。