猫鼬找到一个并推送到文档数组

Mongoose find one and push to array of documents

我是 MongoDB 和 Mongoose 的新手,我正在尝试使用它来保存股票报价以供日间交易分析。所以我想象了这个架构:

symbolSchema = Schema({
    name:String,
    code:String
});

quoteSchema = Schema({
    date:{type:Date, default: now},
    open:Number, 
    high:Number,
    low:Number,
    close:Number,
    volume:Number
});

intradayQuotesSchema = Schema({
    id_symbol:{type:Schema.Types.ObjectId, ref:"symbol"},
    day:Date,
    quotes:[quotesSchema]
});

从我的 link 我每分钟收到这样的信息:

日期 |符号 |打开|高 |低 |关闭 |音量

2015-03-09 13:23:00|AAPL|127,14|127,17|127,12|127,15|19734

我必须:

  1. 找到符号 (AAPL) 的 ObjectId。
  2. 判断该交易品种的intradayQuote文件是否已经存在(交易品种和日期组合)
  3. 判断quotes数组中是否存在该品种的分钟OHLCV数据(因为可以重复)
  4. 更新或创建文档并更新或创建数组内的引号

如果引号已经存在,我可以在不费力的情况下完成这个任务,但是这个方法可以在引号数组中创建重复的条目:

symbol.find({"code":mySymbol}, function(err, stock) {
    intradayQuote.findOneAndUpdate({
        { id_symbol:stock[0]._id, day: myDay },
        { $push: { quotes: myQuotes } },
        { upsert: true },
        myCallback
    });
});

我已经试过了:

有没有一种方法可以在不使用更多查询(我已经在使用 2 个)的情况下使它正常工作?我是否应该重新考虑我的架构以促进这项工作?任何帮助将不胜感激。谢谢!

基本上把 $addToSet operator cannot work for you because your data is not a true "set" 定义为 "completely distinct" 个对象的集合。

这里的另一个逻辑意义是,您将在数据到达时对其进行处理,无论是作为单个对象还是提要。我假设它是某种形式的许多项目的提要,并且您可以使用某种流处理器来为每个收到的文档达到此结构:

{
    "date": new Date("2015-03-09 13:23:00.000Z"),
    "symbol": "AAPL",
    "open": 127.14
    "high": 127.17,
    "low": 127.12 
    "close": 127.15,
    "volume": 19734
}

转换为标准十进制格式和 UTC 日期,因为一旦从数据存储中检索到数据,任何区域设置实际上都应该是您应用程序的域。

我也至少会通过删除对其他集合的引用并将数据放在那里来使您的 "intraDayQuoteSchema" 变平一点。您仍然需要在插入时进行查找,但读取时额外填充的开销似乎比存储开销更昂贵:

intradayQuotesSchema = Schema({
    symbol:{
        name: String,
        code: String
    },
    day:Date,
    quotes:[quotesSchema]
});

这取决于您的使用模式,但这样可能更有效。

剩下的就是

可以接受的事情
stream.on(function(data) {

    var symbol = data.symbol,
        myDay = new Date( 
            data.date.valueOf() - 
                ( data.date.valueOf() % 1000 * 60 * 60 * 24 ));
    delete data.symbol;

    symbol.findOne({ "code": symbol },function(err,stock) {

        intraDayQuote.findOneAndUpdate(
            { "symbol.code": symbol , "day": myDay },
            { "$setOnInsert": { 
               "symbol.name": stock.name
               "quotes": [data] 
            }},
            { "upsert": true }
            function(err,doc) {
                intraDayQuote.findOneAndUpdate(
                    {
                        "symbol.code": symbol,
                        "day": myDay,
                        "quotes.date": data.date
                    },
                    { "$set": { "quotes.$": data } },
                    function(err,doc) {
                        intraDayQuote.findOneAndUpdate(
                            {
                                "symbol.code": symbol,
                                "day": myDay,
                                "quotes.date": { "$ne": data.date }
                            },
                            { "$push": { "quotes": data } },
                            function(err,doc) {

                            }
                       );    
                    }
                );
            }
        );    
    });
});

如果您实际上不需要在响应中修改文档,那么您可以通过在此处实施批量操作 API 并在单个数据库请求中发送此包中的所有更新来获得一些好处:

stream.on("data",function(data) {

    var symbol = data.symbol,
        myDay = new Date( 
            data.date.valueOf() - 
                ( data.date.valueOf() % 1000 * 60 * 60 * 24 ));
    delete data.symbol;

     symbol.findOne({ "code": symbol },function(err,stock) {
         var bulk = intraDayQuote.collection.initializeOrderedBulkOp();
         bulk.find({ "symbol.code": symbol , "day": myDay })
             .upsert().updateOne({
                 "$setOnInsert": { 
                     "symbol.name": stock.name
                     "quotes": [data] 
                 }
             });

         bulk.find({
             "symbol.code": symbol,
             "day": myDay,
             "quotes.date": data.date
         }).updateOne({
             "$set": { "quotes.$": data }
         });

         bulk.find({
             "symbol.code": symbol,
             "day": myDay,
             "quotes.date": { "$ne": data.date }
         }).updateOne({
             "$push": { "quotes": data }
         });

         bulk.execute(function(err,result) {
             // maybe do something with the response
         });            
     });
});

重点是只有其中一个语句会实际修改数据,并且由于这些都是在同一个请求中发送的,所以应用程序和服务器之间的来回更少。

另一种情况是,在这种情况下,在另一个集合中引用实际数据可能更简单。然后这就变成了处理更新插入的简单问题:

intradayQuotesSchema = Schema({
    symbol:{
        name: String,
        code: String
    },
    day:Date,
    quotes:[{ type: Schema.Types.ObjectId, ref: "quote" }]
});


// and in the steam processor

stream.on("data",function(data) {

    var symbol = data.symbol,
        myDay = new Date( 
            data.date.valueOf() - 
                ( data.date.valueOf() % 1000 * 60 * 60 * 24 ));
    delete data.symbol;

    symbol.findOne({ "code": symbol },function(err,stock) {
         quote.update(
            { "date": data.date },
            { "$setOnInsert": data },
            { "upsert": true },
            function(err,num,raw) {
                if ( !raw.updatedExisting ) {
                    intraDayQuote.update(
                        { "symbol.code": symbol , "day": myDay },
                        { 
                            "$setOnInsert": {
                                "symbol.name": stock.name
                            },
                            "$addToSet": { "quotes": data }
                        },
                        { "upsert": true },
                        function(err,num,raw) {

                        }
                    );
                }
            }
        );
    });
});

真正归结为将引号数据嵌套在 "day" 文档中对您有多重要。主要区别在于,如果您想根据其中一些 "quote" 字段的数据查询这些文档,或者以其他方式忍受使用 .populate() 从其他集合中提取 "quotes" 的开销.

当然,如果引用并且报价数据对您的查询过滤很重要,那么您始终可以查询该集合以查找匹配的 _id 值,并在 $in 上使用查询=49=] 文档只匹配包含那些匹配 "quote" 文档的日期。

根据您的应用程序使用数据的方式,选择哪条路径最为重要,这是一个重大决定。希望这会指导您完成您想要实现的目标背后的一般概念。

P.S 除非您 "sure" 您的源数据始终是一个四舍五入到精确 "minute" 的日期,否则您可能想要使用与所使用的相同类型的日期舍入数学也得到离散 "day"。