Logstash 将非结构化日志行合并到一个事件

Logstash to merge unstructured log lines to one event

我的日志格式是这样的

[23:59:43] TYPE[student03]
[23:59:43] Write Ack!
[23:59:43] <application><status>Active</status></application>
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2
[23:59:43] TYPE[student04]
[23:59:43] Write Ack!
[23:59:43] FGHSFHG3453D56HJ3456FJ372GJ1387FFKJH
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2
[23:59:43] TYPE[student03]
[23:59:43] Write Ack!
[23:59:43] FGHSFHG3453D56HJ3456FJ372GJ1387FFKJH
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2

我想解析这些日志以仅获取 TYPE[student03] 和学生的状态。我正在使用 lostash 过滤器删除剩余的行。现在我需要将 TYPE 和状态作为单个事件,以便我可以将它作为单个消息发送到 rabbitmq,例如

[23:59:43]-student03-<application><status>Active</status></application>

我们如何使用聚合过滤器实现这一目标?我试过聚合过滤器。但是,我认为我们需要在两个日志行中使用相同的模式才能与聚合合并。或任何替代方案

请帮忙!

我的logstash配置文件

    beats {
     port => 5044
   }
}
filter {

  if ([message] =~ "Write Ack!"){
   drop{}
  }
  else if ([message] =~ "PushToFile Start"){
    drop{}
  }
  else if ([message] =~ "PushToFile Name"){
    drop{}
  }
  else if ([message] =~ "PushToFile End"){
    drop{}
  }
  else if ([message] =~ "on parser"){
     drop{}
  }  
  else if ([message] =~ "ClientIPAddress") {
    drop { }
  }                     
}
output {
 stdout { codec => rubydebug }
}

您可以使用 Grok 过滤器(见下文)仅解析两个必需的行,然后使用下面的 Ruby 过滤器 进行必要的字符串操作并将最终消息保存在字段中并使用它。

Grok 过滤器

grok{
    match => {"message" => "%{TIME}] TYPE.%{GREEDYDATA:STATUS}."}
    match => {"message" => "%{TIME}] <application><status>Active</status></application>"}
    }
if "_grokparsefailure" in [tags]{drop{}}

Ruby 过滤器

ruby{
    code => "
        if event.get('STATUS')
            @save_status = event.get('STATUS')
            event.cancel()
        else
            event.set('final_message', event.get('message').sub(' ', '-'+@save_status+'-'))
        end
    "
}

请注意,这仅在您的日志按顺序出现时才有效。