ELK Stack 的 Grok 条件解析

Grok conditional parsing for ELK Stack

我有这样的日志:

2020-09-02 14:29:22,854 [http-something] [ERROR] JavaClass(JavaLine) - [6652942]: Error message with no stack trace
2020-09-02 14:29:08,976 [http-something] [INFO] JavaClass(JavaLine) - [6791732]: Some message
2020-09-02 14:29:09,116 [http-something] [ERROR] JavaClass(JavaLine) - [6791732]: Error message with stack trace
JavaException: This is not going well
    at JavaClass
    at JavaClass
    at JavaClass
    at JavaClass
    at JavaClass
Caused by: JavaClass: This is a problem
    at JavaClass
    at JavaClass
    at JavaClass
    at JavaClass
    ... 48 more

我使用这个过滤器在 Kibana 上获得更具可读性的日志:

filter {

    # INFO and ERROR
    grok {
        tag_on_failure => ["_stackTraceFailure"]
        match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}%{SPACE}(\[%{DATA:thread}\])?%{SPACE}\[%{LOGLEVEL:log_level}\]%{SPACE}%{GREEDYDATA}%{SPACE}\-%{SPACE}%{GREEDYDATA:action}" }
        overwrite => [ "message" ]
    }

    # JAVA ERROR
    if ("_stackTraceFailure" in [tags]) {
        grok { 
            tag_on_failure => ["_grokParseFailure"]
            match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}%{SPACE}(\[%{DATA:thread}\])?%{SPACE}\[%{LOGLEVEL:log_level}\]%{SPACE}%{GREEDYDATA}%{SPACE}\-%{SPACE}%{DATA:issue}(\r|\n)+(?m)%{GREEDYDATA:stack-trace}" }
            overwrite => [ "message" ]
            remove_tag => "_stackTraceFailure"
        }
    }
}

问题是第一个模式匹配所有内容,将所有堆栈跟踪(如果有)放在 action 标记中,导致第二个模式永远不会匹配用过的。我知道这个问题是由 GREEDYDATA 引起的,但我对正则表达式不是很熟练,而且我找不到解决方案来做我想做的事。

我不想交换模式的位置,因为 INFO 和 ERROR(没有堆栈跟踪)更常见,所以我需要一种方法让第一个在多行日志或任何情况下失败如果有某种堆栈跟踪,这将使第一个失败。我可以从目前所做的开始做起吗?

你需要在 groks 之前使用条件语句。您可以使用条件过滤整个消息并使用两个不同的 grok 过滤器,或者您可以保持第一个 grok 过滤器相同并使用条件仅解析 action字段,我会建议第二个选项。

在这两种情况下,您都需要根据仅存在于多行消息中的内容进行条件过滤,在本例中可能是 "at JavaClass" 字符串。

所以你需要这样的东西:

if "at JavaClass" not in [message] {
  grok { your first grok }
} else {
    grok { your second grok }
}

如果你想保留你的第一个 grok 而使用第二个只解析 action 字段,它会像这样。

if "at JavaClass" in [action] {
    grok {
        tag_on_failure => ["_grokParseFailure"]
        match => { "action" => "%{DATA:issue}(\r|\n)+(?m)%{GREEDYDATA:stack-trace}" }
    }
}

你没有说你是如何收集日志的,如果你使用 filebeat 或 logstash 并在输入中编码 multiline,你也可以根据标签进行过滤,因为你会有为您的日志命名为 multiline 的标签。