聚合时间戳周期的不同状态
Aggregate different states for timestamp periods
我正在使用 mongoDB 并需要查询已初始化、正在执行并完成一段时间(每小时或每月 ...)的搜索 nº 操作
我的 json 文档具有以下结构:
{
"_id" : ObjectId("55263d62c63265b9bb138551"),
"timestamp" : ISODate("2015-02-12T15:27:48.546Z"),
"duration" : 199821
}
timestamp 字段是进程初始化开始,duration 是执行时间(以毫秒为单位)。如果我添加时间戳+持续时间=完成时间戳
我可以使用此查询对一段时间内的操作次数(10 分钟)进行分组:
db.test.aggregate([
{ "$match" :{
"timestamp":{ "$gte": ISODate("2015-0427T12:00:00.0Z") }
}},
{ "$group" :{
"_id": {
"dayOfMonth":{ "$dayOfMonth": "$timestamp" },
"month":{ "$month": "$timestamp" },
"hour": { "$hour":"$timestamp" },
"time": {
"$subtract": [
{ "$minute":"$timestamp" },
{ "$mod": [{ "$minute": "$timestamp" }, 10] }
]
}
},
"count":{ "$sum":1 }
}},
{ "$sort": { "_id.time": 1 } }
])
但我还需要 "in execution" 和 "finised" 的数量。
我尝试使用 mapreduce 和其他聚合查询,但我无法获得类似的结果:
{
_id: {
"month" : 03,
"minute" : 00,
"Initialized" : 6,
"InExecution" : 10,
"Finished": 5
}
_id: {
"month" : 03,
"minute" : 10,
"Initialized" : 4,
"InExecution" : 12,
"Finished": 4
}
_id: {
"month" : 03,
"minute" : 20,
"Initialized" : 3,
"InExecution" : 8,
"Finished": 5
}
}
这是一个很难理解的问题,但是 "aggregation framework" 这里有一个主要问题,主要是您的 "activities" 生活在几个不同的时间间隔,具体取决于它的当前状态。
这意味着总是有一个 "start" 和一个 "finish" 它所属的间隔以及可能 "several" 可以考虑任务的间隔 "in execution".
聚合框架无法一次真正做到这一点。但是你可以用 mapReduce 做到这一点:
db.test.mapReduce(
function() {
// Work out time values
var finished = this.timestamp.valueOf() + this.duration,
finishedInterval = finished -
( finished % ( 1000 * 60 * 10 ) ),
interval = this.timestamp.valueOf() -
( this.timestamp.valueOf() % ( 1000 * 60 * 10 ) );
// Emit initialized
emit(
{
"year": new Date(interval).getUTCFullYear(),
"month": new Date(interval).getUTCMonth()+1,
"day": new Date(interval).getUTCDate(),
"hour": new Date(interval).getUTCHours(),
"minute": new Date(interval).getUTCMinutes()
},
{
"Initialized": 1,
"InExecution": 0,
"Finshed": 0
}
);
// Emit finished
emit(
{
"year": new Date(finishedInterval).getUTCFullYear(),
"month": new Date(finishedInterval).getUTCMonth()+1,
"day": new Date(finishedInterval).getUTCDate(),
"hour": new Date(finishedInterval).getUTCHours(),
"minute": new Date(finsihedInterval).getUTCMinutes()
},
{
"Initialized": 0,
"InExecution": 0,
"Finshed": 1
}
);
// Emit In execution for every 10 minute interval until finished
if ( ( interval + ( 1000 * 60 * 10 ) ) < finishedInterval ) {
for ( var x = interval; x<finishedInterval; x+= ( 1000 * 60 * 10 ) ) {
emit(
{
"year": new Date(x).getUTCFullYear(),
"month": new Date(x).getUTCMonth()+1,
"day": new Date(x).getUTCDate(),
"hour": new Date(x).getUTCHours(),
"minute": new Date(x).getUTCMinutes()
},
{
"Initialized": 0,
"InExecution": 1,
"Finshed": 0
}
);
}
}
},
function(key,values) {
var result = { "Initialized": 0, "InExecution": 0, "Finshed": 0 };
values.forEach(function(value) {
Object.keys(value).forEach(function(key) {
result[key] += value[key];
});
});
return result;
},
{
"out": { "inline": 1 },
"query": { "timestamp": { "$gte": new Date("2015-04-27T12:00:00Z") } }
}
)
如您所见,大部分工作都是在映射器中完成的。这基本上可以计算出任务 "started" 和 "ended" 所在的时间间隔,并为此发出适当的数据。
然后当然是从任务的 "start" 间隔开始,每 10 分钟间隔发出一个 "in execution" 计数,而该值小于任务的 "end" 间隔任务。
减速器简单地获取每个间隔的所有发射计数并将它们相加。所以这是一个非常简单的操作。
map 和 reduce 逻辑是合理的,但查询选择逻辑存在问题,作业 "finshing" 或 "in execution" 可能会在第一次查询时间之前启动。
为了做到这一点,您需要修复查询选择以考虑这一点,并且由于您没有存储 "finish" 时间,因此您需要计算它,这意味着 JavaScript 评估在 $where
:
的查询中
{
"out": { "inline": 1 },
"query": {
"$where": function() {
return (this.timestamp >= new Date("2015-04-27T12:00:00Z") ||
new Date(this.timestamp.valueOf() + this.duration) >=
new Date("2015-04-27T12:00:00Z"))
}
}
}
这会拾取在查询开始时间或当时结束之前仍然 运行 的项目。
这不是很好,因为它会扫描集合,所以最好将 "finshed" 作为数据中的一个值包含在内,以使查询选择更容易:
{
"out": { "inline": 1 },
"query": {
"$or": [
{ "timestamp": { "$gte": new Date("2015-04-27T12:00:00Z") } },
{ "finished": { "$gte": new Date("2015-04-27T12:00:00Z") } }
]
}
}
可以使用 "index" 并且速度更快。
作为最后一件事,仍然会发出值 "before" 这里的 "timestamp" 过滤器值,因为任何一种形式的 "finished" 都意味着在此时间之前开始的任务。出于同样的原因,在查询条件和逻辑上放置 "end" 时间也是一个好主意。
为此,再次更改选项块以包含要在执行逻辑中使用的 "scope" 变量,并添加到 "query" 条件:
{
"out": { "inline": 1 },
"query": {
"$or": [
{
"timestamp": {
"$gte": new Date("2015-04-27T12:00:00Z"),
"$lt": new Date("2015-04-28T12:00:00Z")
}
},
{
"finished": {
"$gte": new Date("2015-04-27T12:00:00Z"),
"$lt": new Date("2015-04-28T12:00:00Z")
}
}
]
},
"scope": {
"start": new Date("2015-04-27T12:00:00Z"),
"finsh": new Date("2015-04-28T12:00:00Z")
}
}
然后在每个发射周围添加条件,首先是 started where "interval" is greater than "start":
// Emit initialized
if ( interval >= start.valueOf() ) {
emit(
并找到 "finishedInterval" 小于 "finish" 的地方:
// Emit finished
if ( finishedInterval <= finish.valueOf() ) {
emit(
然后也限制 "in execution" 上的循环:
// Emit In execution for every 10 minute interval until finished
if ( ( interval + ( 1000 * 60 * 10 ) ) < finishedInterval ) {
for ( var x = interval; (( x<finishedInterval ) && ( x<finish.valueOf() )); x+= ( 1000 * 60 * 10 ) ) {
if ( x > start.valueOf() ) {
emit(
这为您提供了一个清晰的起点和终点,同时将所有可能的统计信息都列在结果中。
非常感谢布雷克,
为了你的大兴趣。我一直在研究你的解决方案并认为是个好主意。
我找到了聚合框架的可能解决方案。
db.getCollection('test').aggregate(
{$match:{
"timestamp":{$exists: true, "$gte": ISODate("2015-03-27T12:00:00.0Z") },
} },
{ $project: {
_id: 1,
timestamp : 1,
error: {$cond: [{$eq: ["$severidad", "ERROR"]}, 1, 0]},
init: {$cond: [ {$and : [{$eq: [{"$subtract": [
{"$minute":"$timestamp"},
{"$mod": [{"$minute":"$timestamp"}, 10]}
]}, {"$subtract": [
{"$minute":"$timestamp"},
{"$mod": [{"$minute":"$timestamp"}, 10]}
]}]}, {$ne: ["$severidad", "ERROR"]}]}, 1, 0]},
executing: {$cond: [ {$and : [{$gt: [{"$subtract": [
{"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}},
{"$mod": [{"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}}, 10]}
]}, {"$subtract": [
{"$minute":"$timestamp"},
{"$mod": [{"$minute":"$timestamp"}, 10]}
]}]}, {$ne: ["$severidad", "ERROR"]}]}, 1, 0]},
finished: {$cond: [ {$and : [{$eq: [{"$subtract": [
{"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}},
{"$mod": [{"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}}, 10]}
]}, {"$subtract": [
{"$minute":"$timestamp"},
{"$mod": [{"$minute":"$timestamp"}, 10]}
]}]}, {$ne: ["$severidad", "ERROR"]}]}, 1, 0]},
}},
{$group :{_id: {
dayOfMonth:{"$dayOfMonth":"$timestamp"}, month:{"$month":"$timestamp"}, hour:{"$hour":"$timestamp"} ,
time: {
"$subtract": [
{"$minute":"$timestamp"},
{"$mod": [{"$minute":"$timestamp"}, 10]}
]
},
},
NumError: {$sum:"$error"},
NumInit:{$sum:"$init"},
NumExecuting:{$sum:"$executing"},
NumFinished:{$sum:"$finished"}
}},
{ $sort : { "_id": 1} });
1M 条记录需要 1.2 秒
此致,
我正在使用 mongoDB 并需要查询已初始化、正在执行并完成一段时间(每小时或每月 ...)的搜索 nº 操作
我的 json 文档具有以下结构:
{
"_id" : ObjectId("55263d62c63265b9bb138551"),
"timestamp" : ISODate("2015-02-12T15:27:48.546Z"),
"duration" : 199821
}
timestamp 字段是进程初始化开始,duration 是执行时间(以毫秒为单位)。如果我添加时间戳+持续时间=完成时间戳
我可以使用此查询对一段时间内的操作次数(10 分钟)进行分组:
db.test.aggregate([
{ "$match" :{
"timestamp":{ "$gte": ISODate("2015-0427T12:00:00.0Z") }
}},
{ "$group" :{
"_id": {
"dayOfMonth":{ "$dayOfMonth": "$timestamp" },
"month":{ "$month": "$timestamp" },
"hour": { "$hour":"$timestamp" },
"time": {
"$subtract": [
{ "$minute":"$timestamp" },
{ "$mod": [{ "$minute": "$timestamp" }, 10] }
]
}
},
"count":{ "$sum":1 }
}},
{ "$sort": { "_id.time": 1 } }
])
但我还需要 "in execution" 和 "finised" 的数量。
我尝试使用 mapreduce 和其他聚合查询,但我无法获得类似的结果:
{
_id: {
"month" : 03,
"minute" : 00,
"Initialized" : 6,
"InExecution" : 10,
"Finished": 5
}
_id: {
"month" : 03,
"minute" : 10,
"Initialized" : 4,
"InExecution" : 12,
"Finished": 4
}
_id: {
"month" : 03,
"minute" : 20,
"Initialized" : 3,
"InExecution" : 8,
"Finished": 5
}
}
这是一个很难理解的问题,但是 "aggregation framework" 这里有一个主要问题,主要是您的 "activities" 生活在几个不同的时间间隔,具体取决于它的当前状态。
这意味着总是有一个 "start" 和一个 "finish" 它所属的间隔以及可能 "several" 可以考虑任务的间隔 "in execution".
聚合框架无法一次真正做到这一点。但是你可以用 mapReduce 做到这一点:
db.test.mapReduce(
function() {
// Work out time values
var finished = this.timestamp.valueOf() + this.duration,
finishedInterval = finished -
( finished % ( 1000 * 60 * 10 ) ),
interval = this.timestamp.valueOf() -
( this.timestamp.valueOf() % ( 1000 * 60 * 10 ) );
// Emit initialized
emit(
{
"year": new Date(interval).getUTCFullYear(),
"month": new Date(interval).getUTCMonth()+1,
"day": new Date(interval).getUTCDate(),
"hour": new Date(interval).getUTCHours(),
"minute": new Date(interval).getUTCMinutes()
},
{
"Initialized": 1,
"InExecution": 0,
"Finshed": 0
}
);
// Emit finished
emit(
{
"year": new Date(finishedInterval).getUTCFullYear(),
"month": new Date(finishedInterval).getUTCMonth()+1,
"day": new Date(finishedInterval).getUTCDate(),
"hour": new Date(finishedInterval).getUTCHours(),
"minute": new Date(finsihedInterval).getUTCMinutes()
},
{
"Initialized": 0,
"InExecution": 0,
"Finshed": 1
}
);
// Emit In execution for every 10 minute interval until finished
if ( ( interval + ( 1000 * 60 * 10 ) ) < finishedInterval ) {
for ( var x = interval; x<finishedInterval; x+= ( 1000 * 60 * 10 ) ) {
emit(
{
"year": new Date(x).getUTCFullYear(),
"month": new Date(x).getUTCMonth()+1,
"day": new Date(x).getUTCDate(),
"hour": new Date(x).getUTCHours(),
"minute": new Date(x).getUTCMinutes()
},
{
"Initialized": 0,
"InExecution": 1,
"Finshed": 0
}
);
}
}
},
function(key,values) {
var result = { "Initialized": 0, "InExecution": 0, "Finshed": 0 };
values.forEach(function(value) {
Object.keys(value).forEach(function(key) {
result[key] += value[key];
});
});
return result;
},
{
"out": { "inline": 1 },
"query": { "timestamp": { "$gte": new Date("2015-04-27T12:00:00Z") } }
}
)
如您所见,大部分工作都是在映射器中完成的。这基本上可以计算出任务 "started" 和 "ended" 所在的时间间隔,并为此发出适当的数据。
然后当然是从任务的 "start" 间隔开始,每 10 分钟间隔发出一个 "in execution" 计数,而该值小于任务的 "end" 间隔任务。
减速器简单地获取每个间隔的所有发射计数并将它们相加。所以这是一个非常简单的操作。
map 和 reduce 逻辑是合理的,但查询选择逻辑存在问题,作业 "finshing" 或 "in execution" 可能会在第一次查询时间之前启动。
为了做到这一点,您需要修复查询选择以考虑这一点,并且由于您没有存储 "finish" 时间,因此您需要计算它,这意味着 JavaScript 评估在 $where
:
{
"out": { "inline": 1 },
"query": {
"$where": function() {
return (this.timestamp >= new Date("2015-04-27T12:00:00Z") ||
new Date(this.timestamp.valueOf() + this.duration) >=
new Date("2015-04-27T12:00:00Z"))
}
}
}
这会拾取在查询开始时间或当时结束之前仍然 运行 的项目。
这不是很好,因为它会扫描集合,所以最好将 "finshed" 作为数据中的一个值包含在内,以使查询选择更容易:
{
"out": { "inline": 1 },
"query": {
"$or": [
{ "timestamp": { "$gte": new Date("2015-04-27T12:00:00Z") } },
{ "finished": { "$gte": new Date("2015-04-27T12:00:00Z") } }
]
}
}
可以使用 "index" 并且速度更快。
作为最后一件事,仍然会发出值 "before" 这里的 "timestamp" 过滤器值,因为任何一种形式的 "finished" 都意味着在此时间之前开始的任务。出于同样的原因,在查询条件和逻辑上放置 "end" 时间也是一个好主意。
为此,再次更改选项块以包含要在执行逻辑中使用的 "scope" 变量,并添加到 "query" 条件:
{
"out": { "inline": 1 },
"query": {
"$or": [
{
"timestamp": {
"$gte": new Date("2015-04-27T12:00:00Z"),
"$lt": new Date("2015-04-28T12:00:00Z")
}
},
{
"finished": {
"$gte": new Date("2015-04-27T12:00:00Z"),
"$lt": new Date("2015-04-28T12:00:00Z")
}
}
]
},
"scope": {
"start": new Date("2015-04-27T12:00:00Z"),
"finsh": new Date("2015-04-28T12:00:00Z")
}
}
然后在每个发射周围添加条件,首先是 started where "interval" is greater than "start":
// Emit initialized
if ( interval >= start.valueOf() ) {
emit(
并找到 "finishedInterval" 小于 "finish" 的地方:
// Emit finished
if ( finishedInterval <= finish.valueOf() ) {
emit(
然后也限制 "in execution" 上的循环:
// Emit In execution for every 10 minute interval until finished
if ( ( interval + ( 1000 * 60 * 10 ) ) < finishedInterval ) {
for ( var x = interval; (( x<finishedInterval ) && ( x<finish.valueOf() )); x+= ( 1000 * 60 * 10 ) ) {
if ( x > start.valueOf() ) {
emit(
这为您提供了一个清晰的起点和终点,同时将所有可能的统计信息都列在结果中。
非常感谢布雷克,
为了你的大兴趣。我一直在研究你的解决方案并认为是个好主意。
我找到了聚合框架的可能解决方案。
db.getCollection('test').aggregate(
{$match:{
"timestamp":{$exists: true, "$gte": ISODate("2015-03-27T12:00:00.0Z") },
} },
{ $project: {
_id: 1,
timestamp : 1,
error: {$cond: [{$eq: ["$severidad", "ERROR"]}, 1, 0]},
init: {$cond: [ {$and : [{$eq: [{"$subtract": [
{"$minute":"$timestamp"},
{"$mod": [{"$minute":"$timestamp"}, 10]}
]}, {"$subtract": [
{"$minute":"$timestamp"},
{"$mod": [{"$minute":"$timestamp"}, 10]}
]}]}, {$ne: ["$severidad", "ERROR"]}]}, 1, 0]},
executing: {$cond: [ {$and : [{$gt: [{"$subtract": [
{"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}},
{"$mod": [{"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}}, 10]}
]}, {"$subtract": [
{"$minute":"$timestamp"},
{"$mod": [{"$minute":"$timestamp"}, 10]}
]}]}, {$ne: ["$severidad", "ERROR"]}]}, 1, 0]},
finished: {$cond: [ {$and : [{$eq: [{"$subtract": [
{"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}},
{"$mod": [{"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}}, 10]}
]}, {"$subtract": [
{"$minute":"$timestamp"},
{"$mod": [{"$minute":"$timestamp"}, 10]}
]}]}, {$ne: ["$severidad", "ERROR"]}]}, 1, 0]},
}},
{$group :{_id: {
dayOfMonth:{"$dayOfMonth":"$timestamp"}, month:{"$month":"$timestamp"}, hour:{"$hour":"$timestamp"} ,
time: {
"$subtract": [
{"$minute":"$timestamp"},
{"$mod": [{"$minute":"$timestamp"}, 10]}
]
},
},
NumError: {$sum:"$error"},
NumInit:{$sum:"$init"},
NumExecuting:{$sum:"$executing"},
NumFinished:{$sum:"$finished"}
}},
{ $sort : { "_id": 1} });
1M 条记录需要 1.2 秒
此致,