基于 $lookup 检索的字段的多阶段聚合管道匹配数据

multi-stage aggregation pipeline matching data based on fields retrieved through $lookup

我正在尝试在 MongoDB(4.4.9 社区版,使用 Python 3.10 的 pymongo 驱动程序)中构建一个复杂的嵌套聚合管道。

我想将不同集合中的相关数据点聚合到一个新的(理想的)视图(或者,如果这不起作用)集合中。

集合及其中的相关字段遵循层次结构。有members,其中包含要合并其他数据的顶级键, membershipNumber.

> members.find_one()
{'_id': ObjectId('61153299af6122XXXXXXXXXXXXX'), 'membershipNumber': 'N03XXXXXX'}

然后,有一个不同的集合,其中包含 membershipNumber,还有一个不同的 linked 字段 an_user_idan_user_id 在其他集合中用于表示属于该特定用户的数组中的 records/fields。

我'join'membersan_users是这样的:

result = members.aggregate([
    {
        '$lookup': {
            'from': 'an_users',                   
            'localField': 'membershipNumber',     
            'foreignField': 'memref',             
            'as': 'an_users'                      
        }
    },
    {   '$unwind' : '$an_users' },     
    {   
        '$project' : {
            '_id' : 1,
            'membershipNumber' : 1,
            'an_user_id' : '$an_users.user_id'
        } 
    }
]);

到目前为止一切顺利,returns 所需的聚合记录:

{'_id': ObjectId('61153253aBBBBBBBBBBBB'),
  'membershipNumber': 'N0XXXXXXXX',
  'an_user_id': '48XXXXXX'}

现在,我有第三个集合,其中包含 an_user_id 作为数组中的字符串,表示用户点击给定电子邮件的位置,其中记录是电子邮件(clicks 数组中的 an_user_id 是单击该电子邮件中的 link 的用户。

{'_id': ObjectId('blah'),
 'email_id': '407XXX',
 'actions_count': 17,
 'administrative_title': 'test',
 'bounce': ['3440XXXX'],
 'click': ['38294CCC',
  '418FFFF',
  '48XXXXXX',
  '38eGGGG'}

我想计算数组中给定 an_user_id(我通过聚合获得)的出现次数(例如 clicksbouncesopens ) 在 emails 集合中,并将其包含在 .aggregate 调用中,以检索如下内容:

{'_id': ObjectId('61153253aBBBBBBBBBBBB'),
  'membershipNumber': 'N0XXXXXXXX',
  'an_user_id': '48XXXXXX',
  'n_email_clicks' : 412,
  'n_email_bounces' : 12
}

此外,我可能还想在我的数据库中的其他集合中附加 an_user_id 的计数。

例如,考虑这个名为 events:

的集合
{
    "_id": "617ffa96ee11844e143a63dd",
    "id": "12345",
    "administrative_title": "my_event",
    "created_at": {
        "$date": "2020-01-15T16:28:50.000Z"
    },
    "event_creator_id": "123456",
    "event_title": "my_event",
    "group_id": "123456",
    "permalink": "event_id",
    "rsvp_count": 54,
    "rsvps": [{
        "rsvp_id": "56789",
        "display_name": "John Doe",
        "rsvp_user_id": "48XXXXXX",
        "rsvp_created_at": {
            "$date": "2020-01-28T15:38:50.000Z"
        },
        "rsvp_updated_at": {
            "$date": "2020-01-28T15:38:50.000Z"
        },
        "first_name": "John",
        "last_name": "Doe",
    }, {
        "rsvp_id": "543895",
        "display_name": "James Appleslice",
        "rsvp_user_id": "N03XXXXXX",
        "rsvp_created_at": {
            "$date": "2020-02-05T13:15:14.000Z"
        },
        "rsvp_updated_at": {
            "$date": "2020-02-05T13:15:14.000Z"
        },
        "first_name": "James",
        "last_name": "Appleslice"}
  ]
}

所以,最终产品看起来像这样:

{'_id': ObjectId('61153253aBBBBBBBBBBBB'),
  'membershipNumber': 'N0XXXXXXXX',
  'an_user_id': '48XXXXXX',
  'n_email_clicks' : 412,
  'n_email_bounces' : 12,
  'n_rsvps' : 12
}

我的想法是使用 $lookup 参数——但是,我只知道如何使用它来匹配我正在执行聚合的父集合中的字段,但不知道在聚合过程中生成的字段上。

任何帮助将不胜感激!

您可以使用 $lookup 管道。首先,您将 $lookup 用户 ID 后跟另一个 $lookup 以验证电子邮件中是否存在用户 ID。最后还有几个阶段可以根据您的需要收集结果和格式。此外,如果您想将结果写入另一个集合,可以添加 $out 阶段。

db.members.aggregate([{
  $lookup: {
    from: "an_users",
    let: {
      membershipNumber: "$membershipNumber"
    },
    pipeline: [
      {
        $match: {
          $expr: {
            $eq: [
              "$memref",
              "$$membershipNumber"
            ]
          },
          
        }
      },
      {
        "$lookup": {
          "from": "emails",
          "localField": "user_id",
          "foreignField": "click",
          "as": "clicks"
        }
      },
      {
        "$project": {
          "_id": 1,
          "membershipNumber": 1,
          "an_user_id": "$user_id",
          "n_email_clicks": {
            $size: "$clicks"
          }
        }
      }
    ],
    as: "details"
  }
},
{
  $replaceRoot: {
    newRoot: {
      $mergeObjects: [
        {
          $arrayElemAt: [
            "$details",
            0
          ]
        },
        "$$ROOT"
      ]
    }
  }
},
{
  $project: {
    details: 0
  }
}])

工作示例 - https://mongoplayground.net/p/yrFsNp44hpi