Logstash 不处理 filebeat 发送的文件
Logstash does not process files sent by filebeat
我已经使用 docker 设置了一个 elk 堆栈基础结构。
我看不到 logstash 正在处理的文件。
Filebeat 配置为将 .csv 文件从 logstash 发送到 logstash,再到 elasticsearch。我看到 logstash filebeat listner 盯着看。 Logstash 到 elasticsearch 管道可以工作,但是没有 document/index 写入。
请指教
filebeat.yml
filebeat.prospectors:
- input_type: log
paths:
- logs/sms/*.csv
document_type: sms
paths:
- logs/voip/*.csv
document_type: voip
output.logstash:
enabled: true
hosts: ["logstash:5044"]
logging.to_files: true
logging.files:
logstash.conf
input {
beats {
port => "5044"
}
}
filter {
if [document_type] == "sms" {
csv {
columns => ['Date', 'Time', 'PLAN', 'CALL_TYPE', 'MSIDN', 'IMSI', 'IMEI']
separator => " "
skip_empty_columns => true
quote_char => "'"
}
}
if [document_type] == "voip" {
csv {
columns => ['Date', 'Time', 'PostDialDelay', 'Disconnect-Cause', 'Sip-Status','Session-Disposition', 'Calling-RTP-Packets-Lost','Called-RTP-Packets-Lost', 'Calling-RTP-Avg-Jitter','Called-RTP-Avg-Jitter', 'Calling-R-Factor', 'Called-R-Factor', 'Calling-MOS', 'Called-MOS', 'Ingress-SBC', 'Egress-SBC', 'Originating-Trunk-Group', 'Terminating-Trunk-Group']
separator => " "
skip_empty_columns => true
quote_char => "'"
}
}
}
output {
if [document_type] == "sms"{
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "smscdr_index"
}
stdout {
codec => rubydebug
}
}
if [document_type] == "voip" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "voipcdr_index"
}
stdout {
codec => rubydebug
}
}
}
Logstash 部分日志
[2019-12-05T12:48:38,227][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2019-12-05T12:48:38,411][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4ffc5251 run>"}
[2019-12-05T12:48:38,949][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-05T12:48:39,077][INFO ][org.logstash.beats.Server] Starting server on port: 5044
==========================================================================================
[2019-12-05T12:48:43,518][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://elasticsearch:9200"]}
[2019-12-05T12:48:43,745][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>".monitoring-logstash", :thread=>"#<Thread:0x46e8e60c run>"}
[2019-12-05T12:48:43,780][INFO ][logstash.agent ] Pipelines running {:count=>2, :running_pipelines=>[:".monitoring-logstash", :main], :non_running_pipelines=>[]}
[2019-12-05T12:48:45,770][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
filebeat 日志样本
2019-12-05T12:55:33.119Z INFO log/harvester.go:255 Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_1595.csv
2019-12-05T12:55:33.126Z INFO log/harvester.go:255 Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_2004.csv
2019-12-05T12:55:33.130Z INFO log/harvester.go:255 Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_2810.csv
======================================================================================================
2019-12-05T13:00:44.002Z INFO log/harvester.go:280 File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_563.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:44.003Z INFO log/harvester.go:280 File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_2729.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:44.004Z INFO log/harvester.go:280 File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_2308.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:49.218Z INFO log/harvester.go:280 File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_981.csv. Closing because close_inactive of 5m0s reached.
docker-撰写ps
docker-compose -f docker-compose_stash.yml ps
The system cannot find the path specified.
Name Command State Ports
---------------------------------------------------------------------------------------------------------------------
elasticsearch_cdr /usr/local/bin/docker-entr ... Up 0.0.0.0:9200->9200/tcp, 9300/tcp
filebeat_cdr /usr/local/bin/docker-entr ... Up
kibana_cdr /usr/local/bin/kibana-docker Up 0.0.0.0:5601->5601/tcp
logstash_cdr /usr/local/bin/docker-entr ... Up 0.0.0.0:5000->5000/tcp, 0.0.0.0:5044->5044/tcp, 9600/tcp
在 logstash 中,您在字段 document_type
中进行了条件检查,但该字段不是由 filebeat 生成的,您需要更正您的 filebeat 配置。
为您的输入尝试此配置。
filebeat.prospectors:
- input_type: log
paths:
- logs/sms/*.csv
fields:
document_type: sms
paths:
- logs/voip/*.csv
fields:
document_type: voip
这将创建一个名为 fields
的字段和一个名为 document_type
的嵌套字段,如下例所示。
{ "fields" : { "document_type" : "voip" } }
并更改您的 logstash 条件以再次检查字段 fields.document_type
,如下例所示。
if [fields][document_type] == "sms" {
your filters
}
如果需要,您可以使用 filebeat 中的选项 fields_under_root: true
在文档的根目录中创建 document_type
,因此您不需要更改 logstash 条件。
filebeat.prospectors:
- input_type: log
paths:
- logs/sms/*.csv
fields:
document_type: sms
fields_under_root: true
我已经使用 docker 设置了一个 elk 堆栈基础结构。 我看不到 logstash 正在处理的文件。
Filebeat 配置为将 .csv 文件从 logstash 发送到 logstash,再到 elasticsearch。我看到 logstash filebeat listner 盯着看。 Logstash 到 elasticsearch 管道可以工作,但是没有 document/index 写入。
请指教
filebeat.yml
filebeat.prospectors:
- input_type: log
paths:
- logs/sms/*.csv
document_type: sms
paths:
- logs/voip/*.csv
document_type: voip
output.logstash:
enabled: true
hosts: ["logstash:5044"]
logging.to_files: true
logging.files:
logstash.conf
input {
beats {
port => "5044"
}
}
filter {
if [document_type] == "sms" {
csv {
columns => ['Date', 'Time', 'PLAN', 'CALL_TYPE', 'MSIDN', 'IMSI', 'IMEI']
separator => " "
skip_empty_columns => true
quote_char => "'"
}
}
if [document_type] == "voip" {
csv {
columns => ['Date', 'Time', 'PostDialDelay', 'Disconnect-Cause', 'Sip-Status','Session-Disposition', 'Calling-RTP-Packets-Lost','Called-RTP-Packets-Lost', 'Calling-RTP-Avg-Jitter','Called-RTP-Avg-Jitter', 'Calling-R-Factor', 'Called-R-Factor', 'Calling-MOS', 'Called-MOS', 'Ingress-SBC', 'Egress-SBC', 'Originating-Trunk-Group', 'Terminating-Trunk-Group']
separator => " "
skip_empty_columns => true
quote_char => "'"
}
}
}
output {
if [document_type] == "sms"{
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "smscdr_index"
}
stdout {
codec => rubydebug
}
}
if [document_type] == "voip" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "voipcdr_index"
}
stdout {
codec => rubydebug
}
}
}
Logstash 部分日志
[2019-12-05T12:48:38,227][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2019-12-05T12:48:38,411][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4ffc5251 run>"}
[2019-12-05T12:48:38,949][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-05T12:48:39,077][INFO ][org.logstash.beats.Server] Starting server on port: 5044
==========================================================================================
[2019-12-05T12:48:43,518][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://elasticsearch:9200"]}
[2019-12-05T12:48:43,745][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>".monitoring-logstash", :thread=>"#<Thread:0x46e8e60c run>"}
[2019-12-05T12:48:43,780][INFO ][logstash.agent ] Pipelines running {:count=>2, :running_pipelines=>[:".monitoring-logstash", :main], :non_running_pipelines=>[]}
[2019-12-05T12:48:45,770][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
filebeat 日志样本
2019-12-05T12:55:33.119Z INFO log/harvester.go:255 Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_1595.csv
2019-12-05T12:55:33.126Z INFO log/harvester.go:255 Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_2004.csv
2019-12-05T12:55:33.130Z INFO log/harvester.go:255 Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_2810.csv
======================================================================================================
2019-12-05T13:00:44.002Z INFO log/harvester.go:280 File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_563.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:44.003Z INFO log/harvester.go:280 File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_2729.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:44.004Z INFO log/harvester.go:280 File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_2308.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:49.218Z INFO log/harvester.go:280 File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_981.csv. Closing because close_inactive of 5m0s reached.
docker-撰写ps
docker-compose -f docker-compose_stash.yml ps
The system cannot find the path specified.
Name Command State Ports
---------------------------------------------------------------------------------------------------------------------
elasticsearch_cdr /usr/local/bin/docker-entr ... Up 0.0.0.0:9200->9200/tcp, 9300/tcp
filebeat_cdr /usr/local/bin/docker-entr ... Up
kibana_cdr /usr/local/bin/kibana-docker Up 0.0.0.0:5601->5601/tcp
logstash_cdr /usr/local/bin/docker-entr ... Up 0.0.0.0:5000->5000/tcp, 0.0.0.0:5044->5044/tcp, 9600/tcp
在 logstash 中,您在字段 document_type
中进行了条件检查,但该字段不是由 filebeat 生成的,您需要更正您的 filebeat 配置。
为您的输入尝试此配置。
filebeat.prospectors:
- input_type: log
paths:
- logs/sms/*.csv
fields:
document_type: sms
paths:
- logs/voip/*.csv
fields:
document_type: voip
这将创建一个名为 fields
的字段和一个名为 document_type
的嵌套字段,如下例所示。
{ "fields" : { "document_type" : "voip" } }
并更改您的 logstash 条件以再次检查字段 fields.document_type
,如下例所示。
if [fields][document_type] == "sms" {
your filters
}
如果需要,您可以使用 filebeat 中的选项 fields_under_root: true
在文档的根目录中创建 document_type
,因此您不需要更改 logstash 条件。
filebeat.prospectors:
- input_type: log
paths:
- logs/sms/*.csv
fields:
document_type: sms
fields_under_root: true