基于字符串过滤消息
Filter message based on String
我在同一个日志文件中有以下日志
2019-11-23T14:38:43.495 backendorg [http-nio-8080-exec-45] INFO http-nio-8080-exec-45 SessionController http://localhost:8080/ABC/session/login abc.nayak@zinier.com backendorg
2019-11-23T14:38:44.235 backendorg [http-nio-8080-exec-45] INFO http-nio-8080-exec-45 SessionController userSession: backendorg 16CFAFCCFB14D9A3 16E978545E17BFEC 16E978545E1452FF
使用下面的过滤器根据字符串“userSession”解析上面的消息。
input {
file {
tags => ["stacktrace"]
type => "error_logs"
path => ["/Users/znrind-a0053/Downloads/logs/zapp-audit.log"]
start_position => "beginning"
sincedb_path => "/tmp/sincedb_file"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601} "
negate => true
what => previous
}
}
}
filter {
if "userSession" in [message]{
grok {
match => [ "message",
"%{TIMESTAMP_ISO8601:timestamp_match} %{USERNAME:orgId} (\[%{DATA:thread}\])?( )?%{LOGLEVEL:level}%{SPACE}%{USERNAME:zhost} %{JAVAFILE:javaClass} %{URI:url}%{SPACE}(?<email>[\w.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:[.](?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)%{SPACE}%{USERNAME:orgnisation}"]
}
} else {
grok {
match => [ "message",
"%{TIMESTAMP_ISO8601:timestamp_match} %{USERNAME:orgId} (\[%{DATA:thread}\])?( )?%{LOGLEVEL:level}%{SPACE}%{USERNAME:zhost} %{JAVACLASS:javaClass} %{USERNAME:logmessage}:?%{SPACE}%{USERNAME:orgnisation}%{SPACE}%{USERNAME:loginUserId}%{SPACE}%{USERNAME:sessionId}%{SPACE}%{USERNAME:txnId}"]
}
}
}
output {
elasticsearch {
hosts => "localhost"
index => "logs"
}
stdout{codec => json}
}
但是收到 GROK 解析器错误。非常感谢任何建议。
对于电子邮件,您必须使用 ()
(?<email>[a-zA-Z0-9_.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)
或
(?<email>[\w.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:[.](?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)
因此,您的比赛将成为
%{TIMESTAMP_ISO8601:timestamp_match} %{USERNAME:orgId} (\[%{DATA:thread}\])?( )?%{LOGLEVEL:level}%{SPACE}%{USERNAME:zhost} %{JAVAFILE:javaClass} %{URI:url}%{SPACE}(?<email>[\w.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:[.](?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)%{SPACE}%{USERNAME:orgnisation}
在 filter
试试这个:
filter {
if "userSession" in [message]{
grok {
match => [ "message",
"%{TIMESTAMP_ISO8601:timestamp_match} %{USERNAME:orgId} (\[%{DATA:thread}\])?( )?%{LOGLEVEL:level}%{SPACE}%{USERNAME:zhost} %{JAVAFILE:javaClass} %{USERNAME:logmessage}:?%{SPACE}%{USERNAME:orgnisation}%{SPACE}%{USERNAME:loginUserId}%{SPACE}%{USERNAME:sessionId}%{SPACE}%{USERNAME:txnId}"]
}
} else {
grok {
match => [ "message",
"%{TIMESTAMP_ISO8601:timestamp_match} %{USERNAME:orgId} (\[%{DATA:thread}\])?( )?%{LOGLEVEL:level}%{SPACE}%{USERNAME:zhost} %{JAVAFILE:javaClass} %{URI:url}%{SPACE}(?<email>[\w.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:[.](?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)%{SPACE}%{USERNAME:orgnisation}"]
}
}
}
我在同一个日志文件中有以下日志
2019-11-23T14:38:43.495 backendorg [http-nio-8080-exec-45] INFO http-nio-8080-exec-45 SessionController http://localhost:8080/ABC/session/login abc.nayak@zinier.com backendorg
2019-11-23T14:38:44.235 backendorg [http-nio-8080-exec-45] INFO http-nio-8080-exec-45 SessionController userSession: backendorg 16CFAFCCFB14D9A3 16E978545E17BFEC 16E978545E1452FF
使用下面的过滤器根据字符串“userSession”解析上面的消息。
input {
file {
tags => ["stacktrace"]
type => "error_logs"
path => ["/Users/znrind-a0053/Downloads/logs/zapp-audit.log"]
start_position => "beginning"
sincedb_path => "/tmp/sincedb_file"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601} "
negate => true
what => previous
}
}
}
filter {
if "userSession" in [message]{
grok {
match => [ "message",
"%{TIMESTAMP_ISO8601:timestamp_match} %{USERNAME:orgId} (\[%{DATA:thread}\])?( )?%{LOGLEVEL:level}%{SPACE}%{USERNAME:zhost} %{JAVAFILE:javaClass} %{URI:url}%{SPACE}(?<email>[\w.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:[.](?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)%{SPACE}%{USERNAME:orgnisation}"]
}
} else {
grok {
match => [ "message",
"%{TIMESTAMP_ISO8601:timestamp_match} %{USERNAME:orgId} (\[%{DATA:thread}\])?( )?%{LOGLEVEL:level}%{SPACE}%{USERNAME:zhost} %{JAVACLASS:javaClass} %{USERNAME:logmessage}:?%{SPACE}%{USERNAME:orgnisation}%{SPACE}%{USERNAME:loginUserId}%{SPACE}%{USERNAME:sessionId}%{SPACE}%{USERNAME:txnId}"]
}
}
}
output {
elasticsearch {
hosts => "localhost"
index => "logs"
}
stdout{codec => json}
}
但是收到 GROK 解析器错误。非常感谢任何建议。
对于电子邮件,您必须使用 (
(?<email>[a-zA-Z0-9_.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)
或
(?<email>[\w.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:[.](?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)
因此,您的比赛将成为
%{TIMESTAMP_ISO8601:timestamp_match} %{USERNAME:orgId} (\[%{DATA:thread}\])?( )?%{LOGLEVEL:level}%{SPACE}%{USERNAME:zhost} %{JAVAFILE:javaClass} %{URI:url}%{SPACE}(?<email>[\w.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:[.](?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)%{SPACE}%{USERNAME:orgnisation}
在 filter
试试这个:
filter {
if "userSession" in [message]{
grok {
match => [ "message",
"%{TIMESTAMP_ISO8601:timestamp_match} %{USERNAME:orgId} (\[%{DATA:thread}\])?( )?%{LOGLEVEL:level}%{SPACE}%{USERNAME:zhost} %{JAVAFILE:javaClass} %{USERNAME:logmessage}:?%{SPACE}%{USERNAME:orgnisation}%{SPACE}%{USERNAME:loginUserId}%{SPACE}%{USERNAME:sessionId}%{SPACE}%{USERNAME:txnId}"]
}
} else {
grok {
match => [ "message",
"%{TIMESTAMP_ISO8601:timestamp_match} %{USERNAME:orgId} (\[%{DATA:thread}\])?( )?%{LOGLEVEL:level}%{SPACE}%{USERNAME:zhost} %{JAVAFILE:javaClass} %{URI:url}%{SPACE}(?<email>[\w.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:[.](?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)%{SPACE}%{USERNAME:orgnisation}"]
}
}
}