Elasticsearch:获取汇总数据最后条目的摘要
Elasticsearch: getting summary on last entries for aggregated data
我有一个包含如下文档的 Elasticsearch 索引:
entity_id
操作
时间戳
a1
X
2021-01-01
a1
Y
2021-01-02
a1
Z
2021-01-10
b1
Z
2021-01-03
b1
Z
2021-01-05
b1
Y
2021-01-20
c1
Z
2021-01-03
c1
X
2021-01-05
c1
Y
2021-01-20
有一些实体(entity_id),每个实体都可以在不同的时间(时间戳)以各种方式(操作)更新多次。
我需要有关对每个实体执行的最后一个操作的累积信息。例如,对于这些数据,我需要以下形式的信息:X=0, Y=2, Z=1
Y=2 因为“Y”是发生在“b1”和“c1”实体上的最后一个操作
Z=1 因为“Z”是对“a1”实体进行的最后一次操作
我进行了查询以获取有关每个实体的最后一次操作的信息,如下所示:
{
"size": 0,
"aggs": {
"group_by_id": {
"terms": {
"field": "entity_id"
},
"aggs": {
"last_entry": {
"top_hits": {
"size": 1,
"_source": {
"include": [
"operation",
"timestamp"
]
},
"sort": [
{
"timestamp": {
"order": "desc"
}
}
]
}
}
}
}
}
}
它有效,但由于数据量巨大,我将无法在代码中按类型迭代聚合结果和求和操作。如果可行,我需要计算 Elasticsearch 查询中的最后操作数。
有人可以建议如何实现吗?
谢谢!
我一直在寻找解决此类任务的方法。我发现了很多类似的问题,但到目前为止还没有合适的建议。
这是我终于找到的解决方案,也许它对有类似任务的人有用。思路是使用scripted_metric聚合,通过脚本计算需要的汇总数据
{
"size": 0,
"aggs":{
"total": {
"scripted_metric": {
"init_script": "state.operations=new Hashtable();",
"map_script": <Add to state.operations every doc using entity_id as key. When another doc for the same entity_id is found check its timestamp and replace the existing doc if the new found doc is newer>,
"combine_script": "return state.operations",
"reduce_script": <Here you have "states" variable which contains hashtables returned by the combine script per each shard. You can iterate states, merge all hashtables together and return the resulting hashtable or just calculate needed summary values>
}
}
},
"sort": [
{
"timestamp": {
"order": "desc"
}
}
]
}
这只是一个算法,我在 map_script 和 combine_script 中写了简单的描述,因为我的真实案例比我在这里发布的简化示例复杂得多。
我有一个包含如下文档的 Elasticsearch 索引:
entity_id | 操作 | 时间戳 |
---|---|---|
a1 | X | 2021-01-01 |
a1 | Y | 2021-01-02 |
a1 | Z | 2021-01-10 |
b1 | Z | 2021-01-03 |
b1 | Z | 2021-01-05 |
b1 | Y | 2021-01-20 |
c1 | Z | 2021-01-03 |
c1 | X | 2021-01-05 |
c1 | Y | 2021-01-20 |
有一些实体(entity_id),每个实体都可以在不同的时间(时间戳)以各种方式(操作)更新多次。
我需要有关对每个实体执行的最后一个操作的累积信息。例如,对于这些数据,我需要以下形式的信息:X=0, Y=2, Z=1
Y=2 因为“Y”是发生在“b1”和“c1”实体上的最后一个操作
Z=1 因为“Z”是对“a1”实体进行的最后一次操作
我进行了查询以获取有关每个实体的最后一次操作的信息,如下所示:
{
"size": 0,
"aggs": {
"group_by_id": {
"terms": {
"field": "entity_id"
},
"aggs": {
"last_entry": {
"top_hits": {
"size": 1,
"_source": {
"include": [
"operation",
"timestamp"
]
},
"sort": [
{
"timestamp": {
"order": "desc"
}
}
]
}
}
}
}
}
}
它有效,但由于数据量巨大,我将无法在代码中按类型迭代聚合结果和求和操作。如果可行,我需要计算 Elasticsearch 查询中的最后操作数。
有人可以建议如何实现吗?
谢谢!
我一直在寻找解决此类任务的方法。我发现了很多类似的问题,但到目前为止还没有合适的建议。 这是我终于找到的解决方案,也许它对有类似任务的人有用。思路是使用scripted_metric聚合,通过脚本计算需要的汇总数据
{
"size": 0,
"aggs":{
"total": {
"scripted_metric": {
"init_script": "state.operations=new Hashtable();",
"map_script": <Add to state.operations every doc using entity_id as key. When another doc for the same entity_id is found check its timestamp and replace the existing doc if the new found doc is newer>,
"combine_script": "return state.operations",
"reduce_script": <Here you have "states" variable which contains hashtables returned by the combine script per each shard. You can iterate states, merge all hashtables together and return the resulting hashtable or just calculate needed summary values>
}
}
},
"sort": [
{
"timestamp": {
"order": "desc"
}
}
]
}
这只是一个算法,我在 map_script 和 combine_script 中写了简单的描述,因为我的真实案例比我在这里发布的简化示例复杂得多。