过滤数据时 logstash grok 问题
logstash grok issue while filtering data
我有一个数据,基本上是通过 rm 命令删除数据,如下所示。
ttmv516,19/05/21,03:59,00-mins,dvcm,dvcm 166820 4.1 0.0 4212 736 ? DN 03:59 0:01 rm -rf /dv/project/agile/mce_dev_folic/test/install.asan/install,/dv/svgwwt/commander/workspace4/dvfcronrun_IL-SFV-RHEL6.5-K4_kinite_agile_invoke_dvfcronrun_at_given_site_50322
我在下面使用 logstash grok,它工作正常,但直到最近我看到两个奇怪的问题 1) _grokparsefailure
另一个 2) Hostname Field
没有正确显示,即它的初始字符是不像 ttmv516
那样会出现 mv516
.
%{HOSTNAME:Hostname},%{DATE:Date},%{HOUR:dt_h}:%{MINUTE:dt_m},%{NUMBER:duration}-%{WORD:hm},%{USER:User},%{USER:User_1} %{NUMBER:Pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:Num_1} %{NUMBER:Num_2} %{DATA} (?:%{HOUR:dt_h1}:|)(?:%{MINUTE:dt_m1}|) (?:%{HOUR:dt_h2}:|)(?:%{MINUTE:dt_m2}|)%{GREEDYDATA:CMD},%{GREEDYDATA:PWD_PATH}
但是,在 Kibana 数据中使用 grok 调试器进行的测试显示正确。
我的logstash文件如下
cat /etc/logstash/conf.d/rmlog.conf
input {
file {
path => [ "/data/rm_logs/*.txt" ]
start_position => beginning
sincedb_path => "/data/registry-1"
max_open_files => 64000
type => "rmlog"
}
}
filter {
if [type] == "rmlog" {
grok {
match => { "message" => "%{HOSTNAME:Hostname},%{DATE:Date},%{HOUR:dt_h}:%{MINUTE:dt_m},%{NUMBER:duration}-%{WORD:hm},%{USER:User},%{USER:User_1} %{NUMBER:Pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:Num_1} %{NUMBER:Num_2} %{DATA} (?:%{HOUR:dt_h1}:|)(?:%{MINUTE:dt_m1}|) (?:%{HOUR:dt_h2}:|)(?:%{MINUTE:dt_m2}|)%{GREEDYDATA:CMD},%{GREEDYDATA:PWD_PATH}" }
add_field => [ "received_at", "%{@timestamp}" ]
remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
}
}
}
output {
if [type] == "rmlog" {
elasticsearch {
hosts => ["myhost.xyz.com:9200"]
manage_template => false
index => "pt-rmlog-%{+YYYY.MM.dd}"
}
}
}
非常感谢任何帮助建议。
编辑:
根据我的观察失败的消息..
ttmv540,19/05/21,03:59,00-hrs,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv540.373
ttmv541,19/05/21,03:43,-mins,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv541.373
然而,我已经尝试用以下条件编辑 grok,但它仍然删除了几个字段..
input {
file {
path => [ "/data/rm_logs/*.txt" ]
start_position => beginning
max_open_files => 64000
sincedb_path => "/data/registry-1"
type => "rmlog"
}
}
filter {
if [type] == "rmlog" {
grok {
match => { "message" => "%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},%{NUMBER:duration}-%{WORD:hm},%{USER:user},%{USER:group} %{NUMBER:pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:num_1} %{NUMBER:num_2} %{DATA} (?:%{HOUR:time_h1}:|)(?:%{MINUTE:time_m1}|) (?:%{HOUR:time_h2}:|)(?:%{MINUTE:time_m2}|)%{GREEDYDATA:cmd},%{GREEDYDATA:pwd}" }
add_field => [ "received_at", "%{@timestamp}" ]
remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
}
}
if "_grokparsefailure" in [tags] {
grok {
match => { "message" => "%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},-%{WORD:duration},%{USER:user},%{USER:group}%{GREEDYDATA:cmd}" }
add_field => [ "received_at", "%{@timestamp}" ]
remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
}
}
}
output {
if [type] == "rmlog" {
elasticsearch {
hosts => ["myhost.xyz.com:9200"]
manage_template => false
index => "pt-rmlog-%{+YYYY.MM.dd}"
}
}
}
注意:看起来 _grokparsefailure
标签对以下消息有效,但对另一个消息仍然失败..
1) 这有效..
ttmv541,19/05/21,03:43,-mins,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv541.373
ttmv540,19/05/21,03:59,00-hrs,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv540.373
2) 文本日志的第二行失败,因为它有 00-hrs
与之关联的数字,现在无法满足以下 grok 的两个条件..
%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},-%{WORD:duration},%{USER:user},%{USER:group}%{GREEDYDATA:cmd}
我将处理分为两部分,一部分处理主机名和时间戳问题,另一部分处理行的其余部分。我发现这使维护更容易。
所以你剩下这两个输入:
ttmv541,19/05/21,03:43,-mins
ttmv540,19/05/21,03:59,00-hrs
你的两个模式将很好地匹配第一部分,所以问题是你想如何在时间之后解析出这些东西。在您的原始模式中,您使用 duration
作为数字部分,使用 hm
作为单位。在您的第二个模式中,您似乎将单位放入 duration
,这可能是不正确的。
没有更多信息,持续时间似乎是可选的,但您将始终拥有单位。这可以反映在您的模式中,例如:
(%{NUMBER:duration})?-%{WORD:hm}
另请注意,如果您最终需要多个模式,则不必依赖 grokparsefailure 来使用它们 - match->message 可以采用数组。有关示例,请参阅 the doc。
我有一个数据,基本上是通过 rm 命令删除数据,如下所示。
ttmv516,19/05/21,03:59,00-mins,dvcm,dvcm 166820 4.1 0.0 4212 736 ? DN 03:59 0:01 rm -rf /dv/project/agile/mce_dev_folic/test/install.asan/install,/dv/svgwwt/commander/workspace4/dvfcronrun_IL-SFV-RHEL6.5-K4_kinite_agile_invoke_dvfcronrun_at_given_site_50322
我在下面使用 logstash grok,它工作正常,但直到最近我看到两个奇怪的问题 1) _grokparsefailure
另一个 2) Hostname Field
没有正确显示,即它的初始字符是不像 ttmv516
那样会出现 mv516
.
%{HOSTNAME:Hostname},%{DATE:Date},%{HOUR:dt_h}:%{MINUTE:dt_m},%{NUMBER:duration}-%{WORD:hm},%{USER:User},%{USER:User_1} %{NUMBER:Pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:Num_1} %{NUMBER:Num_2} %{DATA} (?:%{HOUR:dt_h1}:|)(?:%{MINUTE:dt_m1}|) (?:%{HOUR:dt_h2}:|)(?:%{MINUTE:dt_m2}|)%{GREEDYDATA:CMD},%{GREEDYDATA:PWD_PATH}
但是,在 Kibana 数据中使用 grok 调试器进行的测试显示正确。
我的logstash文件如下
cat /etc/logstash/conf.d/rmlog.conf
input {
file {
path => [ "/data/rm_logs/*.txt" ]
start_position => beginning
sincedb_path => "/data/registry-1"
max_open_files => 64000
type => "rmlog"
}
}
filter {
if [type] == "rmlog" {
grok {
match => { "message" => "%{HOSTNAME:Hostname},%{DATE:Date},%{HOUR:dt_h}:%{MINUTE:dt_m},%{NUMBER:duration}-%{WORD:hm},%{USER:User},%{USER:User_1} %{NUMBER:Pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:Num_1} %{NUMBER:Num_2} %{DATA} (?:%{HOUR:dt_h1}:|)(?:%{MINUTE:dt_m1}|) (?:%{HOUR:dt_h2}:|)(?:%{MINUTE:dt_m2}|)%{GREEDYDATA:CMD},%{GREEDYDATA:PWD_PATH}" }
add_field => [ "received_at", "%{@timestamp}" ]
remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
}
}
}
output {
if [type] == "rmlog" {
elasticsearch {
hosts => ["myhost.xyz.com:9200"]
manage_template => false
index => "pt-rmlog-%{+YYYY.MM.dd}"
}
}
}
非常感谢任何帮助建议。
编辑:
根据我的观察失败的消息..
ttmv540,19/05/21,03:59,00-hrs,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv540.373
ttmv541,19/05/21,03:43,-mins,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv541.373
然而,我已经尝试用以下条件编辑 grok,但它仍然删除了几个字段..
input {
file {
path => [ "/data/rm_logs/*.txt" ]
start_position => beginning
max_open_files => 64000
sincedb_path => "/data/registry-1"
type => "rmlog"
}
}
filter {
if [type] == "rmlog" {
grok {
match => { "message" => "%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},%{NUMBER:duration}-%{WORD:hm},%{USER:user},%{USER:group} %{NUMBER:pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:num_1} %{NUMBER:num_2} %{DATA} (?:%{HOUR:time_h1}:|)(?:%{MINUTE:time_m1}|) (?:%{HOUR:time_h2}:|)(?:%{MINUTE:time_m2}|)%{GREEDYDATA:cmd},%{GREEDYDATA:pwd}" }
add_field => [ "received_at", "%{@timestamp}" ]
remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
}
}
if "_grokparsefailure" in [tags] {
grok {
match => { "message" => "%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},-%{WORD:duration},%{USER:user},%{USER:group}%{GREEDYDATA:cmd}" }
add_field => [ "received_at", "%{@timestamp}" ]
remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
}
}
}
output {
if [type] == "rmlog" {
elasticsearch {
hosts => ["myhost.xyz.com:9200"]
manage_template => false
index => "pt-rmlog-%{+YYYY.MM.dd}"
}
}
}
注意:看起来 _grokparsefailure
标签对以下消息有效,但对另一个消息仍然失败..
1) 这有效..
ttmv541,19/05/21,03:43,-mins,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv541.373
ttmv540,19/05/21,03:59,00-hrs,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv540.373
2) 文本日志的第二行失败,因为它有 00-hrs
与之关联的数字,现在无法满足以下 grok 的两个条件..
%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},-%{WORD:duration},%{USER:user},%{USER:group}%{GREEDYDATA:cmd}
我将处理分为两部分,一部分处理主机名和时间戳问题,另一部分处理行的其余部分。我发现这使维护更容易。
所以你剩下这两个输入:
ttmv541,19/05/21,03:43,-mins
ttmv540,19/05/21,03:59,00-hrs
你的两个模式将很好地匹配第一部分,所以问题是你想如何在时间之后解析出这些东西。在您的原始模式中,您使用 duration
作为数字部分,使用 hm
作为单位。在您的第二个模式中,您似乎将单位放入 duration
,这可能是不正确的。
没有更多信息,持续时间似乎是可选的,但您将始终拥有单位。这可以反映在您的模式中,例如:
(%{NUMBER:duration})?-%{WORD:hm}
另请注意,如果您最终需要多个模式,则不必依赖 grokparsefailure 来使用它们 - match->message 可以采用数组。有关示例,请参阅 the doc。