用于 Apache 日志的 logstash 自定义日志过滤器
logstash Custom Log Filter for Apache Logs
我是 ELK 堆栈的新手。我有一个 filebeat 服务将日志发送到 logstash,在使用 grok
过滤器的 logstash 中,数据被推送到 elasticsearch
索引。
我正在使用 gork
过滤器和 match => { "message" => "%{COMBINEDAPACHELOG}"}
来解析数据。
我的问题是,我希望将字段的名称及其值存储在 elasticsearch 索引中。我的不同版本的日志如下:
27.60.18.21 - - [27/Aug/2017:10:28:49 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=atm&explain=true&bridge=true HTTP/1.1" 200 3284
27.60.18.21 - - [27/Aug/2017:10:28:49 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=atms&explain=true&bridge=true HTTP/1.1" 200 1452
27.60.18.21 - - [27/Aug/2017:10:28:52 +0530] "GET /api/v1.2/places/nearby/json?&refLocation=28.5359586,77.3677936&keyword=FINATM HTTP/1.1" 200 3283
27.60.18.21 - - [27/Aug/2017:10:29:06 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=co&explain=true&bridge=true HTTP/1.1" 200 3415
27.60.18.21 - - [27/Aug/2017:10:29:06 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=cof&explain=true&bridge HTTP/1.1" 200 2476
我想要的弹性索引字段如下:
- client_ip => 类型必须与 kibana 用于 IP 映射的类型兼容。
- 时间戳 => 日期时间格式。 => 日志的时间
- method => text => 被调用的方法,例如得到,POST
- 版本 => 十进制数 => 例如1.2 / 1.0(在示例日志中为 v1.2)
- 用户名 => 文本 => 这是
username=
之后的文本(在示例日志中为 pradeep.pgu)
- location =>geo_point type => 该值同时具有纬度和经度,以便 kibana 可以在地图上绘制它们。
- search_query => 文本 => 搜索的内容(在来自两个字段 "keyword=" 或 "query=" 之一的示例中)。两个字段中的任何一个都将存在,而存在的那个,必须使用它的值。
- response_code => 数字 => 响应代码。 (样本中为 200)
- data_transfered => 数字 => 传输的数据量(示例中的最后一个数字)。
这样的事情有可能吗? gork 过滤器有这方面的规定吗?问题是参数不是特定于顺序的。
从 HTTPD_COMMONLOG
开始,您可以使用此模式(您可以在 grok tester 测试):
grok {
match => {
"message" => "%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} /api/v%{NUMBER:version}/places/search/json\?%{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response_code} (?:%{NUMBER:data_transfered}|-)"
}
}
一旦 grok 过滤器提取了请求,您可以在其上使用 kv 过滤器,它将提取参数(并忽略参数不是特定于顺序的问题)。您必须将 field_split
选项设置为 &:
kv {
source => "request"
field_split => "&"
}
对于 search_query
,根据存在的字段,我们使用带有 add_field
选项的 mutate
过滤器来创建字段。
filter {
grok {
match => {
"message" => "%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} /api/v%{NUMBER:version}/.*/json\?%{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response_code} (?:%{NUMBER:data_transfered}|-)"
}
}
kv {
source => "request"
field_split => "&"
}
if [query] {
mutate {
add_field => { "search_query" => "%{query}" }
}
} else if [keyword] {
mutate {
add_field => { "search_query" => "%{keyword}" }
}
}
if [refLocation] {
mutate {
rename => { "refLocation" => "location" }
}
}
}
我是 ELK 堆栈的新手。我有一个 filebeat 服务将日志发送到 logstash,在使用 grok
过滤器的 logstash 中,数据被推送到 elasticsearch
索引。
我正在使用 gork
过滤器和 match => { "message" => "%{COMBINEDAPACHELOG}"}
来解析数据。
我的问题是,我希望将字段的名称及其值存储在 elasticsearch 索引中。我的不同版本的日志如下:
27.60.18.21 - - [27/Aug/2017:10:28:49 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=atm&explain=true&bridge=true HTTP/1.1" 200 3284
27.60.18.21 - - [27/Aug/2017:10:28:49 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=atms&explain=true&bridge=true HTTP/1.1" 200 1452
27.60.18.21 - - [27/Aug/2017:10:28:52 +0530] "GET /api/v1.2/places/nearby/json?&refLocation=28.5359586,77.3677936&keyword=FINATM HTTP/1.1" 200 3283
27.60.18.21 - - [27/Aug/2017:10:29:06 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=co&explain=true&bridge=true HTTP/1.1" 200 3415
27.60.18.21 - - [27/Aug/2017:10:29:06 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=cof&explain=true&bridge HTTP/1.1" 200 2476
我想要的弹性索引字段如下:
- client_ip => 类型必须与 kibana 用于 IP 映射的类型兼容。
- 时间戳 => 日期时间格式。 => 日志的时间
- method => text => 被调用的方法,例如得到,POST
- 版本 => 十进制数 => 例如1.2 / 1.0(在示例日志中为 v1.2)
- 用户名 => 文本 => 这是
username=
之后的文本(在示例日志中为 pradeep.pgu) - location =>geo_point type => 该值同时具有纬度和经度,以便 kibana 可以在地图上绘制它们。
- search_query => 文本 => 搜索的内容(在来自两个字段 "keyword=" 或 "query=" 之一的示例中)。两个字段中的任何一个都将存在,而存在的那个,必须使用它的值。
- response_code => 数字 => 响应代码。 (样本中为 200)
- data_transfered => 数字 => 传输的数据量(示例中的最后一个数字)。
这样的事情有可能吗? gork 过滤器有这方面的规定吗?问题是参数不是特定于顺序的。
从 HTTPD_COMMONLOG
开始,您可以使用此模式(您可以在 grok tester 测试):
grok {
match => {
"message" => "%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} /api/v%{NUMBER:version}/places/search/json\?%{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response_code} (?:%{NUMBER:data_transfered}|-)"
}
}
一旦 grok 过滤器提取了请求,您可以在其上使用 kv 过滤器,它将提取参数(并忽略参数不是特定于顺序的问题)。您必须将 field_split
选项设置为 &:
kv {
source => "request"
field_split => "&"
}
对于 search_query
,根据存在的字段,我们使用带有 add_field
选项的 mutate
过滤器来创建字段。
filter {
grok {
match => {
"message" => "%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} /api/v%{NUMBER:version}/.*/json\?%{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response_code} (?:%{NUMBER:data_transfered}|-)"
}
}
kv {
source => "request"
field_split => "&"
}
if [query] {
mutate {
add_field => { "search_query" => "%{query}" }
}
} else if [keyword] {
mutate {
add_field => { "search_query" => "%{keyword}" }
}
}
if [refLocation] {
mutate {
rename => { "refLocation" => "location" }
}
}
}