弹性搜索中的日志聚合

log aggregation in elasticsearch

我正在使用 elasticsearch 7.8,索引中的条目如下所示,

{"_id" : 1,"sourceip":"1.1.1.1", "data" : "this is a sample input", "processedflag" : true}
{"_id" : 2,"sourceip":"1.1.1.1", "data" : "this is a sample input", "processedflag" : false}
{"_id" : 3,"sourceip":"1.1.1.1", "data" : "this is an another input", "processedflag" : false}
{"_id" : 4,"sourceip":"1.1.1.2", "data" : "this is a sample input", "processedflag" : false}

现在对于 sourceip:1.1.1.1,我想聚合并找到“数据”的重复项,
例如在上面的例子中,由于数据匹配,我想获取 1 和 2 条目的 _id。

谢谢,
哈利

查看您的数据,我只考虑了前三个字段并基于它创建了映射、文档、查询和响应。

映射:

PUT my_ip_index
{
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"
      },
      "sourceip":{
        "type": "ip"
      },
      "data":{            
        "type": "keyword"              <----- Notice this though
      }
    }
  }
}

示例文档:

POST my_ip_index/_doc/1
{
  "id": 1,
  "sourceip": "1.1.1.1",
  "data": "this is a sample input"
}

POST my_ip_index/_doc/2
{
  "id": 2,
  "sourceip": "1.1.1.1",
  "data": "this is a sample input"
}

POST my_ip_index/_doc/3
{
  "id": 3,
  "sourceip": "1.1.1.1",
  "data": "this is an another input"
}

POST my_ip_index/_doc/4
{
  "id": 4,
  "sourceip": "1.1.1.2",
  "data": "this is a sample input"
}

POST my_ip_index/_doc/5
{
  "id": 5,
  "sourceip": "1.1.1.2",
  "data": "this is a sample another input"
}

只有前两个文档是相等的,即具有相同的 ip 以及 data

聚合请求:

POST my_ip_index/_search
{
  "size": 0,
  "aggs": {
    "my_ip_address": {
      "terms": {
        "field": "sourceip",
        "min_doc_count": 2                          <---- Note this
      },
      "aggs": {
        "my_data": {
          "terms": {
            "field": "data",
            "min_doc_count": 2                      <---- Note this
          },
          "aggs": {
            "my_duplicate_ids":{
              "terms": {
                "field": "id",
                "size": 10
              }
            }
          }
        },
        "min_bucket_selector": {
          "bucket_selector": {
            "buckets_path": {
              "count": "my_data._bucket_count" 
            },
            "script": {
              "source": "params.count > 0"
            }
          }
        }
      }
    }
  }
}

请注意,我使用了以下聚合并特别注意结构

另请注意我是如何在存储桶脚本聚合部分中使用 bucket_count 特殊路径的。

回复:

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 5,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "my_ip_address" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "1.1.1.1",                          <---- IP
          "doc_count" : 3,
          "my_data" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
              {
                "key" : "this is a sample input",     <---- data
                "doc_count" : 2,
                "my_duplicate_ids" : {
                  "doc_count_error_upper_bound" : 0,
                  "sum_other_doc_count" : 0,
                  "buckets" : [
                    {
                      "key" : "1",                    <---- id you are looking for
                      "doc_count" : 1
                    },
                    {
                      "key" : "2",                    <---- id you are looking for
                      "doc_count" : 1
                    }
                  ]
                }
              }
            ]
          }
        }
      ]
    }
  }
}

希望对您有所帮助!