按组连接字符串

Concat String by Group

我想按 _id 对记录进行分组,并通过组合 client_id 个值创建一个字符串。

以下是我的文档示例:

{
  "_id" : ObjectId("59e955e633d64c81875bfd2f"),
  "tag_id" : 1,
  "client_id" : "10001"
}
{
  "_id" : ObjectId("59e955e633d64c81875bfd30"),
  "tag_id" : 1,
  "client_id" : "10002"
}

我想要这样的输出:

{
  "_id" : 1
  "client_id" : "10001,10002"
}

您可以将聚合框架作为 "two step" 操作来完成。这是首先通过 $push withing a $group pipeline, and then to use $concat with $reduce 在最终投影中生成的数组上将项目累加到数组中:

db.collection.aggregate([
  { "$group": {
    "_id": "$tag_id",
    "client_id": { "$push": "$client_id" }
  }},
  { "$addFields": {
    "client_id": {
      "$reduce": {
        "input": "$client_id",
        "initialValue": "",
        "in": {
          "$cond": {
            "if": { "$eq": [ "$$value", "" ] },
            "then": "$$this",
            "else": {
              "$concat": ["$$value", ",", "$$this"]
            }
          }
        }
      }
    }
  }}
])

我们也在此处应用 $cond 以避免在结果中将空字符串与逗号连接起来,因此它看起来更像是一个分隔列表。

仅供参考 上面有一个 JIRA 问题 SERVER-29339 which does ask for $reduce to be implemented as an accumulator expression to allow it's use directly in a $group pipeline stage. Not likely to happen any time soon, but it theoretically would replace $push 并使操作成为单个管道阶段。建议语法示例在 JIRA 问题上。

如果您没有 $reduce(需要 MongoDB 3.4),那么只需 post 处理光标:

db.collection.aggregate([
  { "$group": {
    "_id": "$tag_id",
    "client_id": { "$push": "$client_id" }
  }},
]).map( doc =>
  Object.assign(
    doc,
   { "client_id": doc.client_id.join(",") }
  )
)

如果你真的必须的话,这会导致使用 mapReduce 执行此操作的另一种选择:

db.collection.mapReduce(
  function() {
    emit(this.tag_id,this.client_id);
  },
  function(key,values) {
    return [].concat.apply([],values.map(v => v.split(","))).join(",");
  },
  { "out": { "inline": 1 } }
)

当然以_idvalue的特定mapReduce形式作为键集输出,但它基本上是输出。

我们使用 [].concat.apply([],values.map(...)) 因为 "reducer" 的输出可以是 "delimited string" 因为 mapReduce 以增量方式处理大量结果,因此 reducer 的输出可以变成 "input" 另一遍。所以我们需要预料到这会发生并相应地对待它。

Mongo 4.4 开始,$group 阶段有一个新的聚合运算符 $accumulator 允许在文档分组时自定义累积:

// { "tag_id" : 1, "client_id" : "10001" }
// { "tag_id" : 1, "client_id" : "10002" }
// { "tag_id" : 2, "client_id" : "9999"  }
db.collection.aggregate([
  { $group: {
    _id: "$tag_id",
    client_id: {
      $accumulator: {
        accumulateArgs: ["$client_id"],
        init: function() { return [] },
        accumulate: function(ids, id) { return ids.concat(id) },
        merge: function(ids1, ids2) { return ids1.concat(ids2) },
        finalize: function(ids) { return ids.join(",") },
        lang: "js"
      }
    }
  }}
])
// { "_id" : 2, "client_id" : "9999" }
// { "_id" : 1, "client_id" : "10001,10002" }

累加器:

  • 场上累积 client_id (accumulateArgs)
  • 初始化为空数组(init)
  • 通过将新 ID 与已经看到的 ID 连接到新 ID(accumulatemerge)来累积
  • 最后将所有的id连接成一个字符串(finalize)