仅聚合最新文档
Aggregate only newest document
我有一个弹性索引,其中包含用户状态历史记录的文档。数据看起来像这样;
{
"session_id": "yunus",
"state_name": "start",
"entry_time": "2016-11-09 15:27:03"
},
{
"session_id": "yunus",
"state_name": "end",
"entry_time": "2016-11-09 16:30:00"
},
{
"session_id": "can",
"state_name": "start",
"entry_time": "2016-11-09 12:01:00"
},
{
"session_id": "rick",
"state_name": "start",
"entry_time": "2016-11-09 09:00:00"
},
{
"session_id": "rick",
"state_name": "end",
"entry_time": "2016-11-10 10:00:00"
}
我想按州名称和日期直方图进行汇总,但仅针对当时相关的最后一个州。所以结果可以是;
2016-11-08
start = 0
end = 0
2016-11-09
start = 2
end = 1
2016-11-10
start = 1
end = 2
实际计划是生成带有时间线的分组条形图,以显示状态随时间的变化。
我尝试了几种方法,例如聚合管道、热门点击,但没有取得任何进展。
感谢任何帮助。
对于任何感兴趣的人,我用spark解决了它。我使用 elastic-spark 从 elasticsearch 读取然后写回 elasticsearch。
这里是 es
的读作 Rdd
;
val allData = sc.esRDD(s"states_${id}/log", query)
然后我首先按会话 ID 分组,按日期排序以仅查找会话的最新状态;
val latestStates = allData.groupBy(k => k._2.get("session_id").get).map(k => (k._2).reduceLeft((d1, d2) => {
d1._2.get("timestamp").get.asInstanceOf[Long] > d2._2.get("timestamp").get.asInstanceOf[Long] match {
case true => d1
case false => d2
}
})).map(_._2)
获得最新的会话状态后,我会过滤退出状态,然后按值计数;
val stateSummary = latestStates
.filter(s => s.isDefinedAt("state_id") && s("state_id").asInstanceOf[Long] != -1)
.map(s => (s("state_id"), s("state_name")))
.countByValue()
.map(d => Map("state_id" -> d._1._1.asInstanceOf[Long], "state_name" -> d._1._2.asInstanceOf[String], "count" -> d._2)).toList
现在我们有各州的当前会话数。 (当前是可配置的,因此我们可以将其设置为特定时间),只剩下东西了,写回 elasticsearch;
sc.makeRDD(Seq(finalElasticDoc)).saveToEs(s"states_${id}/analytic_daily")
我有一个弹性索引,其中包含用户状态历史记录的文档。数据看起来像这样;
{
"session_id": "yunus",
"state_name": "start",
"entry_time": "2016-11-09 15:27:03"
},
{
"session_id": "yunus",
"state_name": "end",
"entry_time": "2016-11-09 16:30:00"
},
{
"session_id": "can",
"state_name": "start",
"entry_time": "2016-11-09 12:01:00"
},
{
"session_id": "rick",
"state_name": "start",
"entry_time": "2016-11-09 09:00:00"
},
{
"session_id": "rick",
"state_name": "end",
"entry_time": "2016-11-10 10:00:00"
}
我想按州名称和日期直方图进行汇总,但仅针对当时相关的最后一个州。所以结果可以是;
2016-11-08
start = 0
end = 0
2016-11-09
start = 2
end = 1
2016-11-10
start = 1
end = 2
实际计划是生成带有时间线的分组条形图,以显示状态随时间的变化。
我尝试了几种方法,例如聚合管道、热门点击,但没有取得任何进展。
感谢任何帮助。
对于任何感兴趣的人,我用spark解决了它。我使用 elastic-spark 从 elasticsearch 读取然后写回 elasticsearch。
这里是 es
的读作 Rdd
;
val allData = sc.esRDD(s"states_${id}/log", query)
然后我首先按会话 ID 分组,按日期排序以仅查找会话的最新状态;
val latestStates = allData.groupBy(k => k._2.get("session_id").get).map(k => (k._2).reduceLeft((d1, d2) => {
d1._2.get("timestamp").get.asInstanceOf[Long] > d2._2.get("timestamp").get.asInstanceOf[Long] match {
case true => d1
case false => d2
}
})).map(_._2)
获得最新的会话状态后,我会过滤退出状态,然后按值计数;
val stateSummary = latestStates
.filter(s => s.isDefinedAt("state_id") && s("state_id").asInstanceOf[Long] != -1)
.map(s => (s("state_id"), s("state_name")))
.countByValue()
.map(d => Map("state_id" -> d._1._1.asInstanceOf[Long], "state_name" -> d._1._2.asInstanceOf[String], "count" -> d._2)).toList
现在我们有各州的当前会话数。 (当前是可配置的,因此我们可以将其设置为特定时间),只剩下东西了,写回 elasticsearch;
sc.makeRDD(Seq(finalElasticDoc)).saveToEs(s"states_${id}/analytic_daily")