如何 mapreduce 具有相互关联的复杂子文档的对象
How can I mapreduce a object with complex subdocuments that relate to each other
首先,这可能是一个被误导的问题,如果是这种情况,我将不胜感激一些关于我应该如何进行的指导。
从我在网上找到的内容来看,mongodb/mongoose mapReduce 似乎是执行此操作的最佳方法,但我一直在努力思考它,但我正在努力理解它的任何内容不是微不足道的,我想知道是否有人可以帮助解释我的问题。我不一定要寻找完整的解决方案。我真的很感激解释得很好的伪代码。我认为让我特别困惑的是如何处理聚合和组合 2 个或更多集合子文档。
另外我知道这可能是由于 model/collection 设计不好,但不幸的是,这完全不在我的掌控之中,所以请不要建议改造。
我的特殊问题是我们现有的模型如下所示:
survey: {
_id: 1111,
name: "name",
questions: [
{_id: 1, text: "a,b, or c?", type: "multipleChoice", options: [a, b, c,]},
{_id: 2, text: "what do you think", type: "freeform"}
],
participants: [{_id: 1, name: "user 1"}, {_id: 2, name: "user 2"}],
results: [{_id: 123, userId: 1, questionId: 1, answer: "a"},
{_id: 124, userId: 2, questionId: 1, answer: "b"},
{_id: 125, userId: 1, questionId: 2, answer: "this is some answer"},
{_id: 126, userId: 2, questionId: 2, answer: "this is another answer"}]
}
然后我们有另一个单独开发的模型,用于跟踪用户在整个调查过程中的进度(这只是一个基本子集,我们还跟踪不同的事件)
trackings:{
_id:123,
surveyId: 1,
userId: 123,
starttime: "2015-05-13 10:46:20.347Z"
endtime: "2015-05-13 10:59:20.347Z"
}
我想做的是得到类似的东西:
{
survey: "survey name",
_id : 1,
totalAverageTime: "00:23:00",
fastestTime : "00:23:00",
slowestTime: "00:25:00",
questions: [
{
_id: 1, text: "a,b, or c?",
type: "multipleChoice",
mostPopularAnswer: "a",
averageTime: "00:13:00",
anwers : [{ userId: 1, answer: "a", time:"00:14:00"},
{ userId: 2, answer: "a", time:"00:12:00"}]
},{
_id: 2, text:"what do you think",
type:"freeform",
averageTime : "00:10:00",
answers : [{ userId: 1, answer: "this is some answer", time:"00:11:00"},
{ userId: 2, answer: "this is another answer", time:"00:09:00"}]
}
]
}
以下方法使用 aggregation framework 来得出更接近所需输出的解决方案。这取决于第三个集合,可以将其视为两个集合 survey
和 trackings
之间的合并。
首先,假设您有以下集合,其中包含基于您问题中示例的测试文档:
// survey collection
db.survey.insert({
_id: 1111,
name: "name",
questions: [
{_id: 1, text: "a,b, or c?", type: "multipleChoice", options: ["a", "b", "c",]},
{_id: 2, text: "what do you think", type: "freeform"}
],
participants: [{_id: 1, name: "user 1"}, {_id: 2, name: "user 2"}],
results: [{_id: 123, userId: 1, questionId: 1, answer: "a"},
{_id: 124, userId: 2, questionId: 1, answer: "b"},
{_id: 125, userId: 1, questionId: 2, answer: "this is some answer"},
{_id: 126, userId: 2, questionId: 2, answer: "this is another answer"}]
})
// trackings collection
db.trackings.insert([
{
_id:1,
surveyId: 1111,
userId: 1,
starttime: "2015-05-13 10:46:20.347Z",
endtime: "2015-05-13 10:59:20.347Z"
},
{
_id:2,
surveyId: 1111,
userId: 2,
starttime: "2015-05-13 10:13:06.176Z",
endtime: "2015-05-13 10:46:28.176Z"
}
])
要创建第三个集合(我们称之为 output_collection
),您需要使用 find()
cursor's forEach()
方法遍历 trackings
集合,将字段转换为日期字符串对于实际的 ISODate 对象,创建一个存储 survey
结果的数组字段,然后将合并的对象保存到第三个集合中。下面演示这个操作:
db.trackings.find().forEach(function(doc){
var survey = db.survey.find({"_id": doc.surveyId}).toArray();
doc.survey = survey;
doc["starttime"] = ISODate(doc.starttime);
doc["endtime"] = ISODate(doc.endtime);
db.output_collection.save(doc);
});
将两个集合合并为 output_collection 后,使用 db.output_collection.findOne()
查询将得到:
{
"_id" : 1,
"surveyId" : 1111,
"userId" : 1,
"starttime" : ISODate("2015-05-13T10:46:20.347Z"),
"endtime" : ISODate("2015-05-13T10:59:20.347Z"),
"survey" : [
{
"_id" : 1111,
"name" : "name",
"questions" : [
{
"_id" : 1,
"text" : "a,b, or c?",
"type" : "multipleChoice",
"options" : [
"a",
"b",
"c"
]
},
{
"_id" : 2,
"text" : "what do you think",
"type" : "freeform"
}
],
"participants" : [
{
"_id" : 1,
"name" : "user 1"
},
{
"_id" : 2,
"name" : "user 2"
}
],
"results" : [
{
"_id" : 123,
"userId" : 1,
"questionId" : 1,
"answer" : "a"
},
{
"_id" : 124,
"userId" : 2,
"questionId" : 1,
"answer" : "b"
},
{
"_id" : 125,
"userId" : 1,
"questionId" : 2,
"answer" : "this is some answer"
},
{
"_id" : 126,
"userId" : 2,
"questionId" : 2,
"answer" : "this is another answer"
}
]
}
]
}
然后您可以在此集合上应用聚合。聚合管道应包含四个 $unwind
** 运算符阶段,它们从输入文档中解构数组以输出每个元素的文档。每个输出文档用一个元素值替换数组。
接下来$project
operator stage reshapes each document in the stream, such as by adding a new field duration
which calculates the time difference in minutes between the starttime and endtime date fields, and uses the Arithmetic Operators做计算。
之后是每个组的$group
operator pipeline stage which groups input documents by the "survey"
key and applies the accumulator expression(s)。消耗所有输入文档并为每个不同的组输出一个文档。
所以你的聚合管道应该是这样的:
db.output_collection.aggregate([
{ "$unwind": "$survey" },
{ "$unwind": "$survey.questions" },
{ "$unwind": "$survey.participants" },
{ "$unwind": "$survey.results" },
{
"$project": {
"survey": 1,
"surveyId": 1,
"userId": 1,
"starttime": 1,
"endtime": 1,
"duration": {
"$divide": [
{ "$subtract": [ "$endtime", "$starttime" ] },
1000 * 60
]
}
}
},
{
"$group": {
"_id": "$surveyId",
"survey": { "$first": "$survey.name"},
"totalAverageTime": {
"$avg": "$duration"
},
"fastestTime": {
"$min": "$duration"
},
"slowestTime": {
"$max": "$duration"
},
"questions": {
"$addToSet": "$survey.questions"
},
"answers": {
"$addToSet": "$survey.results"
}
}
},
{
"$out": "survey_results"
}
])
db.survey_results.find()
输出
/* 0 */
{
"result" : [
{
"_id" : 1111,
"survey" : "name",
"totalAverageTime" : 23.18333333333334,
"fastestTime" : 13,
"slowestTime" : 33.36666666666667,
"questions" : [
{
"_id" : 2,
"text" : "what do you think",
"type" : "freeform"
},
{
"_id" : 1,
"text" : "a,b, or c?",
"type" : "multipleChoice",
"options" : [
"a",
"b",
"c"
]
}
],
"answers" : [
{
"_id" : 126,
"userId" : 2,
"questionId" : 2,
"answer" : "this is another answer"
},
{
"_id" : 124,
"userId" : 2,
"questionId" : 1,
"answer" : "b"
},
{
"_id" : 125,
"userId" : 1,
"questionId" : 2,
"answer" : "this is some answer"
},
{
"_id" : 123,
"userId" : 1,
"questionId" : 1,
"answer" : "a"
}
]
}
],
"ok" : 1
}
更新
将聚合输出到另一个集合后,通过 $out
聚合管道说 survey_results
,然后,您可以将一些本机 JavaScript 函数与 find()
cursor's forEach()
方法一起应用以获得最终对象:
db.survey_results.find().forEach(function(doc){
var questions = [];
doc.questions.forEach(function(q){
var answers = [];
doc.answers.forEach(function(a){
if(a.questionId === q._id){
delete a.questionId;
answers.push(a);
}
});
q.answers = answers;
questions.push(q);
});
delete doc.answers;
doc.questions = questions;
db.survey_results.save(doc);
});
输出:
/* 0 */
{
"_id" : 1111,
"survey" : "name",
"totalAverageTime" : 23.18333333333334,
"fastestTime" : 13,
"slowestTime" : 33.36666666666667,
"questions" : [
{
"_id" : 2,
"text" : "what do you think",
"type" : "freeform",
"answers" : [
{
"_id" : 126,
"userId" : 2,
"answer" : "this is another answer"
},
{
"_id" : 125,
"userId" : 1,
"answer" : "this is some answer"
}
]
},
{
"_id" : 1,
"text" : "a,b, or c?",
"type" : "multipleChoice",
"options" : [
"a",
"b",
"c"
],
"answers" : [
{
"_id" : 124,
"userId" : 2,
"answer" : "b"
},
{
"_id" : 123,
"userId" : 1,
"answer" : "a"
}
]
}
]
}
首先,这可能是一个被误导的问题,如果是这种情况,我将不胜感激一些关于我应该如何进行的指导。
从我在网上找到的内容来看,mongodb/mongoose mapReduce 似乎是执行此操作的最佳方法,但我一直在努力思考它,但我正在努力理解它的任何内容不是微不足道的,我想知道是否有人可以帮助解释我的问题。我不一定要寻找完整的解决方案。我真的很感激解释得很好的伪代码。我认为让我特别困惑的是如何处理聚合和组合 2 个或更多集合子文档。
另外我知道这可能是由于 model/collection 设计不好,但不幸的是,这完全不在我的掌控之中,所以请不要建议改造。
我的特殊问题是我们现有的模型如下所示:
survey: {
_id: 1111,
name: "name",
questions: [
{_id: 1, text: "a,b, or c?", type: "multipleChoice", options: [a, b, c,]},
{_id: 2, text: "what do you think", type: "freeform"}
],
participants: [{_id: 1, name: "user 1"}, {_id: 2, name: "user 2"}],
results: [{_id: 123, userId: 1, questionId: 1, answer: "a"},
{_id: 124, userId: 2, questionId: 1, answer: "b"},
{_id: 125, userId: 1, questionId: 2, answer: "this is some answer"},
{_id: 126, userId: 2, questionId: 2, answer: "this is another answer"}]
}
然后我们有另一个单独开发的模型,用于跟踪用户在整个调查过程中的进度(这只是一个基本子集,我们还跟踪不同的事件)
trackings:{
_id:123,
surveyId: 1,
userId: 123,
starttime: "2015-05-13 10:46:20.347Z"
endtime: "2015-05-13 10:59:20.347Z"
}
我想做的是得到类似的东西:
{
survey: "survey name",
_id : 1,
totalAverageTime: "00:23:00",
fastestTime : "00:23:00",
slowestTime: "00:25:00",
questions: [
{
_id: 1, text: "a,b, or c?",
type: "multipleChoice",
mostPopularAnswer: "a",
averageTime: "00:13:00",
anwers : [{ userId: 1, answer: "a", time:"00:14:00"},
{ userId: 2, answer: "a", time:"00:12:00"}]
},{
_id: 2, text:"what do you think",
type:"freeform",
averageTime : "00:10:00",
answers : [{ userId: 1, answer: "this is some answer", time:"00:11:00"},
{ userId: 2, answer: "this is another answer", time:"00:09:00"}]
}
]
}
以下方法使用 aggregation framework 来得出更接近所需输出的解决方案。这取决于第三个集合,可以将其视为两个集合 survey
和 trackings
之间的合并。
首先,假设您有以下集合,其中包含基于您问题中示例的测试文档:
// survey collection
db.survey.insert({
_id: 1111,
name: "name",
questions: [
{_id: 1, text: "a,b, or c?", type: "multipleChoice", options: ["a", "b", "c",]},
{_id: 2, text: "what do you think", type: "freeform"}
],
participants: [{_id: 1, name: "user 1"}, {_id: 2, name: "user 2"}],
results: [{_id: 123, userId: 1, questionId: 1, answer: "a"},
{_id: 124, userId: 2, questionId: 1, answer: "b"},
{_id: 125, userId: 1, questionId: 2, answer: "this is some answer"},
{_id: 126, userId: 2, questionId: 2, answer: "this is another answer"}]
})
// trackings collection
db.trackings.insert([
{
_id:1,
surveyId: 1111,
userId: 1,
starttime: "2015-05-13 10:46:20.347Z",
endtime: "2015-05-13 10:59:20.347Z"
},
{
_id:2,
surveyId: 1111,
userId: 2,
starttime: "2015-05-13 10:13:06.176Z",
endtime: "2015-05-13 10:46:28.176Z"
}
])
要创建第三个集合(我们称之为 output_collection
),您需要使用 find()
cursor's forEach()
方法遍历 trackings
集合,将字段转换为日期字符串对于实际的 ISODate 对象,创建一个存储 survey
结果的数组字段,然后将合并的对象保存到第三个集合中。下面演示这个操作:
db.trackings.find().forEach(function(doc){
var survey = db.survey.find({"_id": doc.surveyId}).toArray();
doc.survey = survey;
doc["starttime"] = ISODate(doc.starttime);
doc["endtime"] = ISODate(doc.endtime);
db.output_collection.save(doc);
});
将两个集合合并为 output_collection 后,使用 db.output_collection.findOne()
查询将得到:
{
"_id" : 1,
"surveyId" : 1111,
"userId" : 1,
"starttime" : ISODate("2015-05-13T10:46:20.347Z"),
"endtime" : ISODate("2015-05-13T10:59:20.347Z"),
"survey" : [
{
"_id" : 1111,
"name" : "name",
"questions" : [
{
"_id" : 1,
"text" : "a,b, or c?",
"type" : "multipleChoice",
"options" : [
"a",
"b",
"c"
]
},
{
"_id" : 2,
"text" : "what do you think",
"type" : "freeform"
}
],
"participants" : [
{
"_id" : 1,
"name" : "user 1"
},
{
"_id" : 2,
"name" : "user 2"
}
],
"results" : [
{
"_id" : 123,
"userId" : 1,
"questionId" : 1,
"answer" : "a"
},
{
"_id" : 124,
"userId" : 2,
"questionId" : 1,
"answer" : "b"
},
{
"_id" : 125,
"userId" : 1,
"questionId" : 2,
"answer" : "this is some answer"
},
{
"_id" : 126,
"userId" : 2,
"questionId" : 2,
"answer" : "this is another answer"
}
]
}
]
}
然后您可以在此集合上应用聚合。聚合管道应包含四个 $unwind
** 运算符阶段,它们从输入文档中解构数组以输出每个元素的文档。每个输出文档用一个元素值替换数组。
接下来$project
operator stage reshapes each document in the stream, such as by adding a new field duration
which calculates the time difference in minutes between the starttime and endtime date fields, and uses the Arithmetic Operators做计算。
之后是每个组的$group
operator pipeline stage which groups input documents by the "survey"
key and applies the accumulator expression(s)。消耗所有输入文档并为每个不同的组输出一个文档。
所以你的聚合管道应该是这样的:
db.output_collection.aggregate([
{ "$unwind": "$survey" },
{ "$unwind": "$survey.questions" },
{ "$unwind": "$survey.participants" },
{ "$unwind": "$survey.results" },
{
"$project": {
"survey": 1,
"surveyId": 1,
"userId": 1,
"starttime": 1,
"endtime": 1,
"duration": {
"$divide": [
{ "$subtract": [ "$endtime", "$starttime" ] },
1000 * 60
]
}
}
},
{
"$group": {
"_id": "$surveyId",
"survey": { "$first": "$survey.name"},
"totalAverageTime": {
"$avg": "$duration"
},
"fastestTime": {
"$min": "$duration"
},
"slowestTime": {
"$max": "$duration"
},
"questions": {
"$addToSet": "$survey.questions"
},
"answers": {
"$addToSet": "$survey.results"
}
}
},
{
"$out": "survey_results"
}
])
db.survey_results.find()
输出
/* 0 */
{
"result" : [
{
"_id" : 1111,
"survey" : "name",
"totalAverageTime" : 23.18333333333334,
"fastestTime" : 13,
"slowestTime" : 33.36666666666667,
"questions" : [
{
"_id" : 2,
"text" : "what do you think",
"type" : "freeform"
},
{
"_id" : 1,
"text" : "a,b, or c?",
"type" : "multipleChoice",
"options" : [
"a",
"b",
"c"
]
}
],
"answers" : [
{
"_id" : 126,
"userId" : 2,
"questionId" : 2,
"answer" : "this is another answer"
},
{
"_id" : 124,
"userId" : 2,
"questionId" : 1,
"answer" : "b"
},
{
"_id" : 125,
"userId" : 1,
"questionId" : 2,
"answer" : "this is some answer"
},
{
"_id" : 123,
"userId" : 1,
"questionId" : 1,
"answer" : "a"
}
]
}
],
"ok" : 1
}
更新
将聚合输出到另一个集合后,通过 $out
聚合管道说 survey_results
,然后,您可以将一些本机 JavaScript 函数与 find()
cursor's forEach()
方法一起应用以获得最终对象:
db.survey_results.find().forEach(function(doc){
var questions = [];
doc.questions.forEach(function(q){
var answers = [];
doc.answers.forEach(function(a){
if(a.questionId === q._id){
delete a.questionId;
answers.push(a);
}
});
q.answers = answers;
questions.push(q);
});
delete doc.answers;
doc.questions = questions;
db.survey_results.save(doc);
});
输出:
/* 0 */
{
"_id" : 1111,
"survey" : "name",
"totalAverageTime" : 23.18333333333334,
"fastestTime" : 13,
"slowestTime" : 33.36666666666667,
"questions" : [
{
"_id" : 2,
"text" : "what do you think",
"type" : "freeform",
"answers" : [
{
"_id" : 126,
"userId" : 2,
"answer" : "this is another answer"
},
{
"_id" : 125,
"userId" : 1,
"answer" : "this is some answer"
}
]
},
{
"_id" : 1,
"text" : "a,b, or c?",
"type" : "multipleChoice",
"options" : [
"a",
"b",
"c"
],
"answers" : [
{
"_id" : 124,
"userId" : 2,
"answer" : "b"
},
{
"_id" : 123,
"userId" : 1,
"answer" : "a"
}
]
}
]
}