使用弹性搜索地理功能从 XML 原始数据中查找最常见的位置

Using Elastic Search Geo Functionality To Find Most Common Locations from XML raw data

我想使用 Elastic Search 及其 Geo 功能来生成最常见位置的排序列表,其中如果位置在一天之内(例如,彼此相距 100 米),则这些位置被视为相同以周为基础。

其中许多位置将是相同的物理位置(例如用户的家),但显然经度和纬度可能不完全相同。

一周中每一天考虑的数据应该是整个数据周期(一个月)内一周中的同一天。例如,在恰好是星期二的日期搜索常见位置,我们应该查询上周二、前周二、前周二和前周二的数据(!)[也许这可以通过使用 ES 来实现指数?]。

对于每个搜索日,我还想要一个直方图,以 15 分钟的精度再次显示他们在该位置 100 米以内的所有时间,包括一周中同一天的最后 4 周的数据。

我们只能发出一天的 API 原始数据请求(因此需要多次请求才能获取过去四个星期的数据)。我们无法控制的第三方 API 将采用以下格式 return XML - 全部在一行且未格式化(我手动格式化了下面的示例)。坐标是(经度,纬度)格式。最后一位数字(下例中的 0)表示高度,应尽可能存储。

<?xml version="1.0" encoding="UTF-8"?>
<kml>
   <Document>
      [stuff we don't care about]
      <Day>
         [stuff we don't care about]
         <Locations>
        [stuff we don't care about]
            <time>2016-04-30T19:35:01.558+10:00</time>
            <coord>142.9987247 -37.328203799999996 0</coord>
            <time>2016-05-02T12:29:21.233+10:00</time>
            <coord>142.96122699999998 -37.921569999999996 0</coord>
            ....
         </Locations>
      </Day>
   </Document>
</kml>

非常感谢。

与您的 类似,可以很容易地解析给定的 XML 并将结果位置索引到 elasticsearch 中。需要进行一些 XML 解析,然后执行一些数据处理以提取数据,但这是可能的。

我在下面提出了非常简单的 Logstash 配置:

input {
  http_poller {
    urls => {
      get_locations => {
        method => get
        url => "http://your-api.com/locations.xml"
        headers => {
          Accept => "application/xml"
        }
      }
    }
    request_timeout => 60
    interval => 60
    codec => "plain"
  }
}
filter {
  # 1. parse XML
  xml {
    source => "message"
    force_array => false
    target => "parsed"
  }

  # 2. parse time/coord arrays and rebuild pairs
  ruby {
    code => "
      event['locations'] = []
      event['parsed']['Document']['Day']['Locations']['time'].each { |time|
        event['locations'].push({'time' => time, 'location' => nil})
      }
      event['parsed']['Document']['Day']['Locations']['coord'].each_with_index { |coord, i|
        event['locations'][i]['location'] = {
          'lon' => coord.split(' ')[0],
          'lat' => coord.split(' ')[1]
        }
      }
    "
  }

  # 3. produce one event per time/coord pair
  split {
    field => "locations"
  }

  # 4. Some renaming and clean-ups    
  mutate {
    rename => {
      "[locations][time]" => "timestampMs"
      "[locations][location]" => "location"
    }
    remove_field => [
      "parsed", "message", "@timestamp", "@version", "locations"
    ]
  }
}
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "locations"
    document_type => "location"
  }
}

首先,我使用 http_poller 输入来提取 XML 数据

然后,我使用 xml 过滤器将 XML 解析为 JSON。您提供的 XML 将导致以下结果 JSON:

 {
    "Document" => {
        "Day" => {
            "Locations" => {
                 "time" => [
                    [0] "2016-04-30T19:35:01.558+10:00",
                    [1] "2016-05-02T12:29:21.233+10:00"
                ],
                "coord" => [
                    [0] "142.9987247 -37.328203799999996 0",
                    [1] "142.96122699999998 -37.921569999999996 0"
                ]
            }
        }
    }

如您所见,由于笨拙的 XML 组织,timecoord 值都在各自的数组中粘合在一起。

然后我利用 ruby 过滤器将其全部拆分并重新组合每个 time 及其适当的 coord 值。我基本上遍历每个数组并重新构造适当的 time/coord 对并将它们存储到新的 locations 数组中。请注意,当前版本的 Elasticsearch 中的高度部分是 not yet supported

然后我 split 那个新的 locations 数组,以便每 time/coord 对产生一个事件。

最后,我进行了一些清理,将索引到 Elasticsearch 中的事件如下所示:

{
 "timestampMs" => "2016-04-30T19:35:01.558+10:00",
    "location" => {
         "lon" => "142.9987247",
         "lat" => "-37.328203799999996"
    }
}
{
 "timestampMs" => "2016-05-02T12:29:21.233+10:00",
    "location" => {
         "lon" => "142.96122699999998",
         "lat" => "-37.921569999999996"
    }
}

然后您可以 运行 bin/logstash -f locations.conf 以便 运行 您的管道。

有了它,您就可以重用与 中相同的聚合,这将起作用。