如何提取不在数组类型字段中的值的总数

How to extract total count for values not in array type fields

我将完整的消息线程(包括消息)存储为单个文档。数组字段 participants 包含参与者用户 ID。每条消息都有数组字段 read_by,其中包含读取该消息的用户 ID。

示例数据:

db.threads_test.insert( { "subject" : "subject 1", "participants" : ["u1", "u2"], "messages" : [
{"message_id" : "m1", "message" : "msg 1", "read_by" : ["u1"]},
{"message_id" : "m2", "message" : "msg 2", "read_by" : ["u2"]}
]});

db.threads_test.insert( { "subject" : "subject 2", "participants" : ["u1", "u2"], "messages" : [
{"message_id" : "m3", "message" : "msg 3", "read_by" : ["u1"]},
{"message_id" : "m4", "message" : "msg 4", "read_by" : ["u1"]}
]});

db.threads_test.insert( { "subject" : "subject 3", "participants" : ["u1", "u3"], "messages" : [
{"message_id" : "m5", "message" : "msg 5", "read_by" : ["u1", "u3"]}
]});

我需要找出用户有多少未读线程和多少未读消息。

解读逻辑是这样的:

每个用户的预期未读计数:

u1: threds=1, messages=1
u2: threads=2, messages=3
u3: threads=0, messages=0

我一直在检查聚合框架,但找不到解决方案。

Mongo版本为2.4.9

我认为你这样做是错误的(我和我的意见可能是错误的)。

创建多个集合可能会更好。为什么不让 table 调用 threadsmessagesuser_read:

Collection threads:
{thread_id: "...", subject: "...", participants: ["u1", "u2"], ...}
{thread_id: "...", subject: "...", participants: ["u1", "u3"], ...}
...

Collection messages:
{thread_id: "...", user_id: "...", message: "..."}
{thread_id: "...", user_id: "...", message: "..."}
...


Collection user_read:
{user_id: "u1", type: "thread", id: "..."}
{user_id: "u1", type: "message", id: "..."} # Care here the type is a message
...

现在您知道在第一个集合中计数有 X 个线程(我们称之为 nbrThreads)。您可以在 user_read 中轻松计算 "u1" 读取的线程数(使用 type=='thread')(我们称之为 userThreadsRead)。因此:

unreadThread = nbrThreads-userThreadsRead

通过在第二个集合中计数和在 user_read table 中(使用 type=='message')对消息进行相同的逻辑。调用这些变量 nbrMessages 和 userMessagesRead

unreadMessage = nbrMessages-userMessagesRead

此外,如果您不能更改结构,我建议您重新组织一下您的结构,以便能够执行类似的操作。我应该可以通过获取数组的长度来对你的结构做同样的事情。

此致!

这对于聚合框架来说不是一件容易的事,主要是因为有很多数组,所以有很多方法可以很容易地弄错。幸运的是,当你仔细观察时,这里有一个合理的逻辑模式,因为它只是归结为一个比较点:

db.threads_test.aggregate([
    // Unwind all arrays
    { "$unwind": "$messages" },
    { "$unwind": "$messages.read_by" },
    { "$unwind": "$participants" },

    // Group on distinct "message_id" comparing "particpant" and "read_by"
    { "$group": {
        "_id": {
            "_id": "$_id",
            "participant": "$participants",
            "message_id": "$messages.message_id"
        },
        "unread": { 
            "$min": {
                "$cond": [
                    { "$ne": [ "$participants", "$messages.read_by" ] },
                    1,
                    0
                ]
            }
        }
    }},

    // Get a sum of unread per thread
    { "$group": {
        "_id": {
            "_id": "$_id._id",
            "participant": "$_id.participant",
        },
        "unread": { "$sum": "$unread" }
    }},

    // Sum per participant counting unread threads
    { "$group": {
        "_id": "$_id.participant",
        "threads": { 
            "$sum": { 
                "$cond": [
                    { "$ne": [ "$unread", 0 ] },
                    1,
                    0
                ] 
            }       
        },
        "unread": { "$sum": "$unread" }
    }}
])

结果是:

{ "_id" : "u2", "threads" : 2, "unread" : 3 }
{ "_id" : "u3", "threads" : 0, "unread" : 0 }
{ "_id" : "u1", "threads" : 1, "unread" : 1 }

在每个数组上处理的第一个 $group stage there is critical. Ater $unwind 将涉及大量重复。幸运的是,"thread" 和 "message" 的每个级别都有自己不同的 "id" 值。连同独特的 "participants" 本身,这是一个关键点。

当你查看 "unwound" 形式的数据时,你应该能够通过所有重复看到这里的 "key test" 是比较 "participant" 和 "read_by" 值以查看它们是否相同。就像 "processing loops" 一样(除了所有的组合都没有列出),那么你只需要 return "once" 对于给定的消息,其中 "participant" 和 "ready_by" 是 "equal".

这解释了 "grouping" 组合。对于由 "thread"、"participant" 和 "message_id" 组成的 "key",您只需要该比较的 $min 数值结果"read_by"。因此,如果至少 "read_by" 中的 "one" 匹配,则计数为 1 否则为 0.

接下来的阶段只是对您的总数进行仔细分组。首先获取每个线程的总 "unread" 计数,然后将具有未读消息的线程计数到最终的 "participant" 分组键。

因此,虽然这不是 "always" 找到解决方案的途径,但在开始时执行所有 $unwind 操作是您可视化数据的好方法,这样您就可以理解解决方案。


替代方法

正如您所说,您有 MongoDB 2.4 可用于此,并且根据您的集合的大小,然后像这样使用 $unwind 进行处理可能会导致大量开销。更高版本对此有一些规定,但这可能是一个问题。

我之前提到过 "processing loops",这正是您可以用 mapReduce 做的事情。

虽然此处通常首选聚合框架,但如果大小受到限制,您可能需要考虑这一点:

db.threads_test.mapReduce(
    function () {
      var doc = this;
      doc.participants.forEach(function(participant) {
        doc.messages.forEach(function(message) {
          var obj = {
            threads: [],
            unread: 0
          };

          if ( message.read_by.indexOf(participant) == -1 ) {
            obj.threads.push(doc._id.valueOf());
            obj.unread = 1;
          }

          emit(participant,obj);
        })
      })
    },
    function (key,values) {

      var result = { "threads": [], "unread": 0 };

      values.forEach(function(value) {
        value.threads.forEach(function(thread) {
          if ( result.threads.indexOf(thread) == -1 )
            result.threads.push(thread);
        })
        result.unread += value.unread;
      });

      return result;

    },
    { 
       "finalize": function(key,value) {
           value.threads = value.threads.length;
           return value;
       },
       "out": { "inline": 1 }
    }
)

这里真的是一样的。每条消息的线程上的每个参与者都会将它们与 "read_by" 列表进行比较,以查看它们是否在其中。我们在消息未读时发出 "thread id",如果 "unread" 则发出结果。这是针对线程上的每条消息与参与者一起发出的。所以 "loop of loops".

结果是 "reduced",方法是从 "threads" 中提取不同的值并对参与者的未读消息求和。

由于 "threads" 是一个不同的 "id" 值列表,我们只需要缩减后该列表的长度。这就是 "finalize" 在这里所做的,并将列表转换为其长度的数值。

相同的结果,但不如这是 mapreduce 的限制:

    "results" : [
            {
                    "_id" : "u1",
                    "value" : {
                            "threads" : 1,
                            "unread" : 1
                    }
            },
            {
                    "_id" : "u2",
                    "value" : {
                            "threads" : 2,
                            "unread" : 3
                    }
            },
            {
                    "_id" : "u3",
                    "value" : {
                            "threads" : 0,
                            "unread" : 0
                    }
            }
    ],

无论哪一个最适合你,现在对于程序大脑和聚合大脑来说,问题的解决方案应该都很清楚了