使用 logstash(ELK 堆栈)解析 json

Parsing json using logstash (ELK stack)

我创建了一个简单的 json,如下所示

[
    {
        "Name": "vishnu",
        "ID": 1
    },
    {
        "Name": "vishnu",
        "ID": 1
    }
] 

我将这些值保存在名为 simple.txt 的文件中。然后我使用 file beat 来监听文件并将新的更新发送到端口 5043,另一方面我启动了监听这个端口的 log-stash 服务,以便解析并将 json 传递给弹性搜索。 log-stash 没有处理 json 值,它挂在中间。

logstash

input {
  beats {
    port => 5043
    host => "0.0.0.0"
    client_inactivity_timeout => 3600
  }
}
filter {
  json {
    source => "message"
  }
}
output {
    stdout { codec => rubydebug }
}

filebeat 配置:

filebeat.prospectors:

    - input_type: log
      paths:
        - filepath
    output.logstash:
      hosts: ["localhost:5043"]

Logstash 输出

**

Sending Logstash's logs to D:/elasticdb/logstash-5.6.3/logstash-5.6.3/logs which is now configured via log4j2.properties
[2017-10-31T19:01:17,574][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"D:/elasticdb/logstash-5.6.3/logstash-5.6.3/modules/fb_apache/configuration"}
[2017-10-31T19:01:17,578][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"D:/elasticdb/logstash-5.6.3/logstash-5.6.3/modules/netflow/configuration"}
[2017-10-31T19:01:18,301][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>250}
[2017-10-31T19:01:18,388][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5043"}
[2017-10-31T19:01:18,573][INFO ][logstash.pipeline        ] Pipeline main started
[2017-10-31T19:01:18,591][INFO ][org.logstash.beats.Server] Starting server on port: 5043
[2017-10-31T19:01:18,697][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

**

每次我 运行 log-stash 使用命令

logstash -f logstash.conf

并且由于没有处理 json 我将按 ctrl + c 停止该服务。

请帮我提前找到solution.Thanks

最后我得到了像 this.It 这样适合我的配置。

input 
{
    file 
    {
       codec => multiline
        {
            pattern => '^\{'
            negate => true
            what => previous                
        }
        path => "D:\elasticdb\logstash-tutorial.log\Test.txt"
        start_position => "beginning"       
        sincedb_path => "D:\elasticdb\logstash-tutorial.log\null"
        exclude => "*.gz"
    }
}

filter {
json {
source => "message"
remove_field => ["path","@timestamp","@version","host","message"]
}
}

output {
  elasticsearch { hosts => ["localhost"] 
  index => "logs"
  "document_type" => "json_from_logstash_attempt3"
  }
  stdout{}
}

Json格式:

{"name":"sachin","ID":"1","TS":1351146569}
{"name":"sachin","ID":"1","TS":1351146569}
{"name":"sachin","ID":"1","TS":1351146569}