Filebeat/Logstash 多行系统日志解析
Filebeat/Logstash Multiline Syslog Parsing
我正在将系统日志解析到 ELK 堆栈中。
系统日志示例
Jul 19 10:47:21 host-abc systemd: Started myservice
Jul 19 10:47:29 host-abc systemd: Started service.
Jul 19 10:47:29 host-abc systemd: Starting service...
最理想的是将第2行和第3行聚合成一条消息,例如返回:Started Service. Starting service...
因此我希望时间戳、主机名和程序名在组合行之前匹配。
您可以使用 aggregate
过滤器来达到您想要的效果。 aggregate
过滤器支持基于公共字段值将多个日志行聚合到一个事件中。在您的情况下,公共字段将是 @timestamp
、hostname
和 program_name
.
的组合
由于 syslog
输入已经正确解析系统日志行,我们不需要 grok 任何东西,所以我们可以立即利用 aggregate
过滤器。我们根据 SYSLOGBASE2
字段聚合行,该字段将包含冒号字符 :
之前的所有内容。然后我们简单地收集所有消息,最后我们将消息连接成一个字符串。它是这样的:
input {
syslog {
...
}
}
filter {
aggregate {
task_id => "%{SYSLOGBASE2}"
code => "map['message'] ||= []; map['message'].push(event.get('message'));"
push_map_as_event_on_timeout => true
timeout_task_id_field => "user_id"
timeout => 1 # 1 second timeout
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('message', map['message'].join(' '))"
}
}
output {
...
}
我正在将系统日志解析到 ELK 堆栈中。
系统日志示例
Jul 19 10:47:21 host-abc systemd: Started myservice
Jul 19 10:47:29 host-abc systemd: Started service.
Jul 19 10:47:29 host-abc systemd: Starting service...
最理想的是将第2行和第3行聚合成一条消息,例如返回:Started Service. Starting service...
因此我希望时间戳、主机名和程序名在组合行之前匹配。
您可以使用 aggregate
过滤器来达到您想要的效果。 aggregate
过滤器支持基于公共字段值将多个日志行聚合到一个事件中。在您的情况下,公共字段将是 @timestamp
、hostname
和 program_name
.
由于 syslog
输入已经正确解析系统日志行,我们不需要 grok 任何东西,所以我们可以立即利用 aggregate
过滤器。我们根据 SYSLOGBASE2
字段聚合行,该字段将包含冒号字符 :
之前的所有内容。然后我们简单地收集所有消息,最后我们将消息连接成一个字符串。它是这样的:
input {
syslog {
...
}
}
filter {
aggregate {
task_id => "%{SYSLOGBASE2}"
code => "map['message'] ||= []; map['message'].push(event.get('message'));"
push_map_as_event_on_timeout => true
timeout_task_id_field => "user_id"
timeout => 1 # 1 second timeout
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('message', map['message'].join(' '))"
}
}
output {
...
}