logstash进入Elasticsearch时如何创建JSON格式的日志
How can I create JSON format log when entering into Elasticsearch by logstash
我被告知,通过使用 logstash 管道,我可以在进入 elasticsearch 时重新创建日志格式(即 JSON)。但不明白怎么做。
当前 LOGStash 配置(我从 Google 中获取了以下内容,并非出于任何特定原因)
/etc/logstash/conf.d/metrics-pipeline.conf
input {
beats {
port => 5044
client_inactivity_timeout => "3600"
}
}
filter {
if [message] =~ />/ {
dissect {
mapping => {
"message" => "%{start_of_message}>%{content}"
}
}
kv {
source => "content"
value_split => ":"
field_split => ","
trim_key => "\[\]"
trim_value => "\[\]"
target => "message"
}
mutate {
remove_field => ["content","start_of_message"]
}
}
}
filter {
if [system][process] {
if [system][process][cmdline] {
grok {
match => {
"[system][process][cmdline]" => "^%{PATH:[system][process][cmdline_path]}"
}
remove_field => "[system][process][cmdline]"
}
}
}
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch {
hosts => "1.2.1.1:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
我有几个日志文件位于
/root/logs/File.log
/root/logs/File2.log
日志格式为:
08:26:51,753 DEBUG [ABC] (default-threads - 78) (1.2.3.4)(368)>[TIMESTAMP:Wed Sep 11 08:26:51 UTC 2019],[IMEI:03537],[COMMAND:INFO],[GPS STATUS:true],[INFO:true],[SIGNAL:false],[ENGINE:0],[DOOR:0],[LON:90.43],[LAT:23],[SPEED:0.0],[HEADING:192.0],[BATTERY:100.0%],[CHARGING:1],[O&E:CONNECTED],[GSM_SIGNAL:100],[GPS_SATS:5],[GPS POS:true],[FUEL:0.0V/0.0%],[ALARM:NONE][SERIAL:01EE]
在 Kibana 中默认显示为 ethis
https://imgshare.io/image/stackflow.I0u7S
https://imgshare.io/image/jsonlog.IHQhp
"message": "21:33:42,004 DEBUG [LOG] (default-threads - 100) (1.2.3.4)(410)>[TIMESTAMP:Sat Sep 07 21:33:42 UTC 2019],[TEST:123456],[CMD:INFO],[STATUS:true],[INFO:true],[SIGNAL:false],[ABC:0],[DEF:0],[GHK:1111],[SERIAL:0006]"
但我想像下面那样得到它:-
"message": {
"TIMESTAMP": "Sat Sep 07 21:33:42 UTC 2019",
"TEST": "123456",
"CMD":INFO,
"STATUS":true,
"INFO":true,
"SIGNAL":false,
"ABC":0,
"DEF":0,
"GHK":0,
"GHK":1111
}
这可以做到吗?如果是怎么办?
谢谢
使用 if [message] =~ />/
,过滤器将仅适用于包含 >
的邮件。解剖过滤器将在 >
之间拆分消息。 kv 过滤器将对消息的第二部分应用键值转换,删除 []
。 mutate.remove_field 删除任何额外的字段。
filter {
if [message] =~ />/ {
dissect {
mapping => {
"message" => "%{start_of_message}>%{content}"
}
}
kv {
source => "content"
value_split => ":"
field_split => ","
trim_key => "\[\]"
trim_value => "\[\]"
target => "message"
}
mutate {
remove_field => ["content","start_of_message"]
}
}
}
结果,使用提供的日志行:
{
"@version": "1",
"host": "YOUR_MACHINE_NAME",
"message": {
"DEF": "0",
"TIMESTAMP": "Sat Sep 07 21:33:42 UTC 2019",
"TEST": "123456",
"CMD": "INFO",
"SERIAL": "0006]\r",
"GHK": "1111",
"INFO": "true",
"STATUS": "true",
"ABC": "0",
"SIGNAL": "false"
},
"@timestamp": "2019-09-10T09:21:16.422Z"
}
除了用if [message] =~ />/
做过滤外,你还可以在path
字段上做比较,这是由文件输入插件设置的。此外,如果您有多个文件输入,您可以设置 type
字段并使用此字段,请参阅 。
我被告知,通过使用 logstash 管道,我可以在进入 elasticsearch 时重新创建日志格式(即 JSON)。但不明白怎么做。
当前 LOGStash 配置(我从 Google 中获取了以下内容,并非出于任何特定原因)
/etc/logstash/conf.d/metrics-pipeline.conf
input {
beats {
port => 5044
client_inactivity_timeout => "3600"
}
}
filter {
if [message] =~ />/ {
dissect {
mapping => {
"message" => "%{start_of_message}>%{content}"
}
}
kv {
source => "content"
value_split => ":"
field_split => ","
trim_key => "\[\]"
trim_value => "\[\]"
target => "message"
}
mutate {
remove_field => ["content","start_of_message"]
}
}
}
filter {
if [system][process] {
if [system][process][cmdline] {
grok {
match => {
"[system][process][cmdline]" => "^%{PATH:[system][process][cmdline_path]}"
}
remove_field => "[system][process][cmdline]"
}
}
}
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch {
hosts => "1.2.1.1:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
我有几个日志文件位于
/root/logs/File.log /root/logs/File2.log
日志格式为:
08:26:51,753 DEBUG [ABC] (default-threads - 78) (1.2.3.4)(368)>[TIMESTAMP:Wed Sep 11 08:26:51 UTC 2019],[IMEI:03537],[COMMAND:INFO],[GPS STATUS:true],[INFO:true],[SIGNAL:false],[ENGINE:0],[DOOR:0],[LON:90.43],[LAT:23],[SPEED:0.0],[HEADING:192.0],[BATTERY:100.0%],[CHARGING:1],[O&E:CONNECTED],[GSM_SIGNAL:100],[GPS_SATS:5],[GPS POS:true],[FUEL:0.0V/0.0%],[ALARM:NONE][SERIAL:01EE]
在 Kibana 中默认显示为 ethis
https://imgshare.io/image/stackflow.I0u7S https://imgshare.io/image/jsonlog.IHQhp
"message": "21:33:42,004 DEBUG [LOG] (default-threads - 100) (1.2.3.4)(410)>[TIMESTAMP:Sat Sep 07 21:33:42 UTC 2019],[TEST:123456],[CMD:INFO],[STATUS:true],[INFO:true],[SIGNAL:false],[ABC:0],[DEF:0],[GHK:1111],[SERIAL:0006]"
但我想像下面那样得到它:-
"message": {
"TIMESTAMP": "Sat Sep 07 21:33:42 UTC 2019",
"TEST": "123456",
"CMD":INFO,
"STATUS":true,
"INFO":true,
"SIGNAL":false,
"ABC":0,
"DEF":0,
"GHK":0,
"GHK":1111
}
这可以做到吗?如果是怎么办? 谢谢
使用 if [message] =~ />/
,过滤器将仅适用于包含 >
的邮件。解剖过滤器将在 >
之间拆分消息。 kv 过滤器将对消息的第二部分应用键值转换,删除 []
。 mutate.remove_field 删除任何额外的字段。
filter {
if [message] =~ />/ {
dissect {
mapping => {
"message" => "%{start_of_message}>%{content}"
}
}
kv {
source => "content"
value_split => ":"
field_split => ","
trim_key => "\[\]"
trim_value => "\[\]"
target => "message"
}
mutate {
remove_field => ["content","start_of_message"]
}
}
}
结果,使用提供的日志行:
{
"@version": "1",
"host": "YOUR_MACHINE_NAME",
"message": {
"DEF": "0",
"TIMESTAMP": "Sat Sep 07 21:33:42 UTC 2019",
"TEST": "123456",
"CMD": "INFO",
"SERIAL": "0006]\r",
"GHK": "1111",
"INFO": "true",
"STATUS": "true",
"ABC": "0",
"SIGNAL": "false"
},
"@timestamp": "2019-09-10T09:21:16.422Z"
}
除了用if [message] =~ />/
做过滤外,你还可以在path
字段上做比较,这是由文件输入插件设置的。此外,如果您有多个文件输入,您可以设置 type
字段并使用此字段,请参阅 。