Logstash 不处理 filebeat 发送的文件

Logstash does not process files sent by filebeat

我已经使用 docker 设置了一个 elk 堆栈基础结构。 我看不到 logstash 正在处理的文件。

Filebeat 配置为将 .csv 文件从 logstash 发送到 logstash,再到 elasticsearch。我看到 logstash filebeat listner 盯着看。 Logstash 到 elasticsearch 管道可以工作,但是没有 document/index 写入。

请指教

filebeat.yml

    filebeat.prospectors:
    - input_type: log
      paths:
         - logs/sms/*.csv
      document_type: sms
      paths:
         - logs/voip/*.csv
      document_type: voip

    output.logstash:
      enabled: true
      hosts: ["logstash:5044"]

    logging.to_files: true
    logging.files:

logstash.conf

input {
    beats {
        port => "5044"
   }
}

filter {
 if [document_type] == "sms" {
        csv {
                columns => ['Date', 'Time', 'PLAN', 'CALL_TYPE', 'MSIDN', 'IMSI', 'IMEI']
                separator => " "
                skip_empty_columns => true
                quote_char => "'"
        }
  }
 if [document_type] == "voip" {
  csv {
    columns => ['Date', 'Time', 'PostDialDelay', 'Disconnect-Cause', 'Sip-Status','Session-Disposition', 'Calling-RTP-Packets-Lost','Called-RTP-Packets-Lost', 'Calling-RTP-Avg-Jitter','Called-RTP-Avg-Jitter', 'Calling-R-Factor', 'Called-R-Factor', 'Calling-MOS', 'Called-MOS', 'Ingress-SBC', 'Egress-SBC', 'Originating-Trunk-Group', 'Terminating-Trunk-Group']
        separator => " "
        skip_empty_columns => true
        quote_char => "'"
    }
  }
}

output {
     if [document_type] == "sms"{
                elasticsearch {
                        hosts => ["elasticsearch:9200"]
                        index => "smscdr_index"
                        }
                stdout {
                codec => rubydebug
               }
      }
  if [document_type] == "voip" {
               elasticsearch {
                        hosts => ["elasticsearch:9200"]
                        index => "voipcdr_index"
                    }
               stdout {
               codec => rubydebug
              }
      }
}

Logstash 部分日志

[2019-12-05T12:48:38,227][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2019-12-05T12:48:38,411][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4ffc5251 run>"}
[2019-12-05T12:48:38,949][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-05T12:48:39,077][INFO ][org.logstash.beats.Server] Starting server on port: 5044
==========================================================================================
[2019-12-05T12:48:43,518][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://elasticsearch:9200"]}
[2019-12-05T12:48:43,745][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>".monitoring-logstash", :thread=>"#<Thread:0x46e8e60c run>"}
[2019-12-05T12:48:43,780][INFO ][logstash.agent           ] Pipelines running {:count=>2, :running_pipelines=>[:".monitoring-logstash", :main], :non_running_pipelines=>[]}
[2019-12-05T12:48:45,770][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

filebeat 日志样本

2019-12-05T12:55:33.119Z        INFO    log/harvester.go:255    Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_1595.csv
2019-12-05T12:55:33.126Z        INFO    log/harvester.go:255    Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_2004.csv
2019-12-05T12:55:33.130Z        INFO    log/harvester.go:255    Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_2810.csv
======================================================================================================
2019-12-05T13:00:44.002Z        INFO    log/harvester.go:280    File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_563.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:44.003Z        INFO    log/harvester.go:280    File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_2729.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:44.004Z        INFO    log/harvester.go:280    File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_2308.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:49.218Z        INFO    log/harvester.go:280    File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_981.csv. Closing because close_inactive of 5m0s reached.

docker-撰写ps

docker-compose -f docker-compose_stash.yml ps
The system cannot find the path specified.
      Name                     Command               State                            Ports
---------------------------------------------------------------------------------------------------------------------
elasticsearch_cdr   /usr/local/bin/docker-entr ...   Up      0.0.0.0:9200->9200/tcp, 9300/tcp
filebeat_cdr        /usr/local/bin/docker-entr ...   Up
kibana_cdr          /usr/local/bin/kibana-docker     Up      0.0.0.0:5601->5601/tcp
logstash_cdr        /usr/local/bin/docker-entr ...   Up      0.0.0.0:5000->5000/tcp, 0.0.0.0:5044->5044/tcp, 9600/tcp

在 logstash 中,您在字段 document_type 中进行了条件检查,但该字段不是由 filebeat 生成的,您需要更正您的 filebeat 配置。

为您的输入尝试此配置。

filebeat.prospectors:
- input_type: log
  paths:
     - logs/sms/*.csv
  fields:
    document_type: sms
  paths:
     - logs/voip/*.csv
  fields:
    document_type: voip

这将创建一个名为 fields 的字段和一个名为 document_type 的嵌套字段,如下例所示。

{ "fields" : { "document_type" : "voip" } }

并更改您的 logstash 条件以再次检查字段 fields.document_type,如下例所示。

if [fields][document_type] == "sms" {
  your filters
}

如果需要,您可以使用 filebeat 中的选项 fields_under_root: true 在文档的根目录中创建 document_type,因此您不需要更改 logstash 条件。

filebeat.prospectors:
- input_type: log
  paths:
     - logs/sms/*.csv
  fields:
    document_type: sms
  fields_under_root: true