Logstash 将非结构化日志行合并到一个事件
Logstash to merge unstructured log lines to one event
我的日志格式是这样的
[23:59:43] TYPE[student03]
[23:59:43] Write Ack!
[23:59:43] <application><status>Active</status></application>
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2
[23:59:43] TYPE[student04]
[23:59:43] Write Ack!
[23:59:43] FGHSFHG3453D56HJ3456FJ372GJ1387FFKJH
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2
[23:59:43] TYPE[student03]
[23:59:43] Write Ack!
[23:59:43] FGHSFHG3453D56HJ3456FJ372GJ1387FFKJH
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2
我想解析这些日志以仅获取 TYPE[student03] 和学生的状态。我正在使用 lostash 过滤器删除剩余的行。现在我需要将 TYPE 和状态作为单个事件,以便我可以将它作为单个消息发送到 rabbitmq,例如
[23:59:43]-student03-<application><status>Active</status></application>
我们如何使用聚合过滤器实现这一目标?我试过聚合过滤器。但是,我认为我们需要在两个日志行中使用相同的模式才能与聚合合并。或任何替代方案
请帮忙!
我的logstash配置文件
beats {
port => 5044
}
}
filter {
if ([message] =~ "Write Ack!"){
drop{}
}
else if ([message] =~ "PushToFile Start"){
drop{}
}
else if ([message] =~ "PushToFile Name"){
drop{}
}
else if ([message] =~ "PushToFile End"){
drop{}
}
else if ([message] =~ "on parser"){
drop{}
}
else if ([message] =~ "ClientIPAddress") {
drop { }
}
}
output {
stdout { codec => rubydebug }
}
您可以使用 Grok 过滤器(见下文)仅解析两个必需的行,然后使用下面的 Ruby 过滤器 进行必要的字符串操作并将最终消息保存在字段中并使用它。
Grok 过滤器
grok{
match => {"message" => "%{TIME}] TYPE.%{GREEDYDATA:STATUS}."}
match => {"message" => "%{TIME}] <application><status>Active</status></application>"}
}
if "_grokparsefailure" in [tags]{drop{}}
Ruby 过滤器
ruby{
code => "
if event.get('STATUS')
@save_status = event.get('STATUS')
event.cancel()
else
event.set('final_message', event.get('message').sub(' ', '-'+@save_status+'-'))
end
"
}
请注意,这仅在您的日志按顺序出现时才有效。
我的日志格式是这样的
[23:59:43] TYPE[student03]
[23:59:43] Write Ack!
[23:59:43] <application><status>Active</status></application>
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2
[23:59:43] TYPE[student04]
[23:59:43] Write Ack!
[23:59:43] FGHSFHG3453D56HJ3456FJ372GJ1387FFKJH
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2
[23:59:43] TYPE[student03]
[23:59:43] Write Ack!
[23:59:43] FGHSFHG3453D56HJ3456FJ372GJ1387FFKJH
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2
我想解析这些日志以仅获取 TYPE[student03] 和学生的状态。我正在使用 lostash 过滤器删除剩余的行。现在我需要将 TYPE 和状态作为单个事件,以便我可以将它作为单个消息发送到 rabbitmq,例如
[23:59:43]-student03-<application><status>Active</status></application>
我们如何使用聚合过滤器实现这一目标?我试过聚合过滤器。但是,我认为我们需要在两个日志行中使用相同的模式才能与聚合合并。或任何替代方案
请帮忙!
我的logstash配置文件
beats {
port => 5044
}
}
filter {
if ([message] =~ "Write Ack!"){
drop{}
}
else if ([message] =~ "PushToFile Start"){
drop{}
}
else if ([message] =~ "PushToFile Name"){
drop{}
}
else if ([message] =~ "PushToFile End"){
drop{}
}
else if ([message] =~ "on parser"){
drop{}
}
else if ([message] =~ "ClientIPAddress") {
drop { }
}
}
output {
stdout { codec => rubydebug }
}
您可以使用 Grok 过滤器(见下文)仅解析两个必需的行,然后使用下面的 Ruby 过滤器 进行必要的字符串操作并将最终消息保存在字段中并使用它。
Grok 过滤器
grok{
match => {"message" => "%{TIME}] TYPE.%{GREEDYDATA:STATUS}."}
match => {"message" => "%{TIME}] <application><status>Active</status></application>"}
}
if "_grokparsefailure" in [tags]{drop{}}
Ruby 过滤器
ruby{
code => "
if event.get('STATUS')
@save_status = event.get('STATUS')
event.cancel()
else
event.set('final_message', event.get('message').sub(' ', '-'+@save_status+'-'))
end
"
}
请注意,这仅在您的日志按顺序出现时才有效。