Logstash:elasticsearch 输出和非结构化数据
Logstash: elasticsearch output and unstructured data
Filebeat.yml 文件:
filebeat.inputs:
- type: log
paths:
- C:\Program Files\Filebeat\test_logs\*\*\*\*.txt
exclude_lines: ['^Infobase.+']
output.logstash:
hosts: ["localhost:5044"]
worker: 1
Filebeat 从这样的文件夹结构中收集日志:
C:\Program Files\Filebeat\test_logs\*\*\*\*.txt
这里有很多文件夹,每个文件夹最后至少有几条日志。
日志文件示例(在几个日志文件中,时间可能相同,因为日志来自不同的用户):
"03.08.2020 10:56:38","Event LClick","Type Menu","t=0","beg"
"03.08.2020 10:56:38","Event LClick","Type Menu","Detail SomeDetail","t=109","end"
"03.08.2020 10:56:40","Event LClick","t=1981","beg"
"03.08.2020 10:56:40","Event LClick","t=2090","end"
"03.08.2020 10:56:41","Event LClick","Type ToolBar","t=3026","beg"
"03.08.2020 10:56:43","Event LClick","Type ToolBar","Detail User_Desktop","t=4477","end"
"03.08.2020 10:56:44","Event FormActivate","Name Form_Name:IsaA","t=5444"
"03.08.2020 10:56:51","Event LClick","t=12543","beg"
"03.08.2020 10:56:51","Event LClick","t=12605","end"
"03.08.2020 10:56:52","Event LClick","Form ","Type Label","Name Application.for.training","t=13853","beg"
"03.08.2020 10:57:54","Event LClick","Form Application.for.training","Type Label","Name Application.for.training","t=75442","end"
"03.08.2020 10:57:54","Event FormActivate","Name List.form","t=75785"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","end"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","t=89373","beg"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=89451","end"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","t=96580","beg"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=96643","end"
Logstash 配置文件:
input {
beats {
port => '5044'
}
}
filter {
grok {
patterns_dir => ['./patterns']
match => { 'message' => '%{TIME:timestamp}(","Event\s)(?<Event>([^"]+))(","Form\s)?(?<Form>([^"]+))?(","ParentType\s)?(?<parent_type>([^"]+))?(","ParentName\s)?(?<parent_name>([^"]+))?(","Type\s)?(?<type>([^"]+))?(","Name\s)?(?<Name_of_form>([^"]+))?(","Detail\s)?(?<Detail>([^"]+))?(","t=)?(?<t>([\d]+))?(",")?(?<Status>(end|beg))?' }
add_tag => [ '%{Status}' ]
}
dissect {
mapping => {
'[log][file][path]' => 'C:\Program Files\Filebeat\test_logs\%{somethingtoo}\%{something}\%{User_Name}\%{filename}.txt'
}
}
date {
match => [ 'timestamp', 'dd.MM.yyyy HH:mm:ss' ]
}
elapsed {
unique_id_field => 'Event'
start_tag => 'beg'
end_tag => 'end'
new_event_on_match => false
}
if 'elapsed' in [tags] {
aggregate {
task_id => '%{Event}'
code => 'map["duration"] = [(event.get("elapsed_time")*1000).to_i]'
map_action => 'create'
}
}
mutate {
remove_field => ['timestamp', 'ecs', 'log', 'tags', 'message', '@version', 'something', 'somethingtoo', 'filename', 'input', 'host', 'agent', 't', 'parent_type', 'parent_name', 'type']
rename => {'elapsed_time' => 'Event_duration'}
}
}
output {
elasticsearch {
hosts => ['localhost:9200']
index => 'test'
}
}
在我的 logstash.conf 中,我正在使用聚合过滤器并设置 worker 1 (-w 1) 以正常工作。
当我只使用一个日志文件进行测试和配置时,我设置了 -w 1 并且一切正常。但是当我开始从每个目录收集所有日志时,问题就出现了。
数据没有正确放入elasticsearch(根据聚合的结果从奇怪的数字可以明显看出)
我尝试在 logstash 输出的 filebeat.yml 中设置它(工人:1),但它仍然没有帮助。
问题:
- 也许你知道如何解决这个问题?因为奇怪的是,对于一个目录末尾的一个日志文件或多个日志文件,一切正常,而当添加更多目录时,一切都突然崩溃。
- 如果我没看错的话,elasticsearch是有索引和类型的。每个日志都有一个时间和一个用户名,其日志是,也许我应该按日志时间和用户名类型将数据放在索引中,这样不同用户的同一时间的日志就不会重叠。我应该如何实施?我试图查找信息,仅找到有关 document_type 的信息,该信息已被弃用。
您使用的 elapsed
和 aggregate
字段不是唯一的,您可以在不同文件中为 Event
字段使用相同的值,这可以使 elapsed
过滤器使用一个文件的开始事件和另一个文件的结束事件。
发生这种情况是因为 filebeat 收割机并行处理文件并将其批量发送到 logstash。配置中的 worker
选项在您的情况下没有用,它与发送数据的工作人员数量有关,而不是收集。
您可以尝试使用选项 harvester_limit: 1
来限制并行收集器的数量,但这会减慢您的数据处理速度,并且不能保证它不会混淆您的过滤器。此外,Filebeat 不保证事件的顺序,只是 at-least-once 传递。
最好的解决方案是创建一个将 Event
字段与 filename
字段连接起来的唯一字段,这样来自不同文件的事件就不会混淆。
您可以通过在 elapsed
过滤器之前添加 mutate
过滤器来做到这一点。
mutate {
add_field => { "uniqueEvent" => "%{Event}_%{filename}" }
}
这将创建一个名为 uniqueEvent
的字段,其值类似于 Lclick_filename
,然后您将在 elapsed
和 aggregate
过滤器中使用这个新字段。
如果不同文件夹中的文件名相同,则需要使用路径中的另一个字段,直到 uniqueEvent
的值成为唯一值。
Filebeat.yml 文件:
filebeat.inputs:
- type: log
paths:
- C:\Program Files\Filebeat\test_logs\*\*\*\*.txt
exclude_lines: ['^Infobase.+']
output.logstash:
hosts: ["localhost:5044"]
worker: 1
Filebeat 从这样的文件夹结构中收集日志:
C:\Program Files\Filebeat\test_logs\*\*\*\*.txt
这里有很多文件夹,每个文件夹最后至少有几条日志。
日志文件示例(在几个日志文件中,时间可能相同,因为日志来自不同的用户):
"03.08.2020 10:56:38","Event LClick","Type Menu","t=0","beg"
"03.08.2020 10:56:38","Event LClick","Type Menu","Detail SomeDetail","t=109","end"
"03.08.2020 10:56:40","Event LClick","t=1981","beg"
"03.08.2020 10:56:40","Event LClick","t=2090","end"
"03.08.2020 10:56:41","Event LClick","Type ToolBar","t=3026","beg"
"03.08.2020 10:56:43","Event LClick","Type ToolBar","Detail User_Desktop","t=4477","end"
"03.08.2020 10:56:44","Event FormActivate","Name Form_Name:IsaA","t=5444"
"03.08.2020 10:56:51","Event LClick","t=12543","beg"
"03.08.2020 10:56:51","Event LClick","t=12605","end"
"03.08.2020 10:56:52","Event LClick","Form ","Type Label","Name Application.for.training","t=13853","beg"
"03.08.2020 10:57:54","Event LClick","Form Application.for.training","Type Label","Name Application.for.training","t=75442","end"
"03.08.2020 10:57:54","Event FormActivate","Name List.form","t=75785"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","end"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","t=89373","beg"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=89451","end"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","t=96580","beg"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=96643","end"
Logstash 配置文件:
input {
beats {
port => '5044'
}
}
filter {
grok {
patterns_dir => ['./patterns']
match => { 'message' => '%{TIME:timestamp}(","Event\s)(?<Event>([^"]+))(","Form\s)?(?<Form>([^"]+))?(","ParentType\s)?(?<parent_type>([^"]+))?(","ParentName\s)?(?<parent_name>([^"]+))?(","Type\s)?(?<type>([^"]+))?(","Name\s)?(?<Name_of_form>([^"]+))?(","Detail\s)?(?<Detail>([^"]+))?(","t=)?(?<t>([\d]+))?(",")?(?<Status>(end|beg))?' }
add_tag => [ '%{Status}' ]
}
dissect {
mapping => {
'[log][file][path]' => 'C:\Program Files\Filebeat\test_logs\%{somethingtoo}\%{something}\%{User_Name}\%{filename}.txt'
}
}
date {
match => [ 'timestamp', 'dd.MM.yyyy HH:mm:ss' ]
}
elapsed {
unique_id_field => 'Event'
start_tag => 'beg'
end_tag => 'end'
new_event_on_match => false
}
if 'elapsed' in [tags] {
aggregate {
task_id => '%{Event}'
code => 'map["duration"] = [(event.get("elapsed_time")*1000).to_i]'
map_action => 'create'
}
}
mutate {
remove_field => ['timestamp', 'ecs', 'log', 'tags', 'message', '@version', 'something', 'somethingtoo', 'filename', 'input', 'host', 'agent', 't', 'parent_type', 'parent_name', 'type']
rename => {'elapsed_time' => 'Event_duration'}
}
}
output {
elasticsearch {
hosts => ['localhost:9200']
index => 'test'
}
}
在我的 logstash.conf 中,我正在使用聚合过滤器并设置 worker 1 (-w 1) 以正常工作。
当我只使用一个日志文件进行测试和配置时,我设置了 -w 1 并且一切正常。但是当我开始从每个目录收集所有日志时,问题就出现了。 数据没有正确放入elasticsearch(根据聚合的结果从奇怪的数字可以明显看出)
我尝试在 logstash 输出的 filebeat.yml 中设置它(工人:1),但它仍然没有帮助。
问题:
- 也许你知道如何解决这个问题?因为奇怪的是,对于一个目录末尾的一个日志文件或多个日志文件,一切正常,而当添加更多目录时,一切都突然崩溃。
- 如果我没看错的话,elasticsearch是有索引和类型的。每个日志都有一个时间和一个用户名,其日志是,也许我应该按日志时间和用户名类型将数据放在索引中,这样不同用户的同一时间的日志就不会重叠。我应该如何实施?我试图查找信息,仅找到有关 document_type 的信息,该信息已被弃用。
您使用的 elapsed
和 aggregate
字段不是唯一的,您可以在不同文件中为 Event
字段使用相同的值,这可以使 elapsed
过滤器使用一个文件的开始事件和另一个文件的结束事件。
发生这种情况是因为 filebeat 收割机并行处理文件并将其批量发送到 logstash。配置中的 worker
选项在您的情况下没有用,它与发送数据的工作人员数量有关,而不是收集。
您可以尝试使用选项 harvester_limit: 1
来限制并行收集器的数量,但这会减慢您的数据处理速度,并且不能保证它不会混淆您的过滤器。此外,Filebeat 不保证事件的顺序,只是 at-least-once 传递。
最好的解决方案是创建一个将 Event
字段与 filename
字段连接起来的唯一字段,这样来自不同文件的事件就不会混淆。
您可以通过在 elapsed
过滤器之前添加 mutate
过滤器来做到这一点。
mutate {
add_field => { "uniqueEvent" => "%{Event}_%{filename}" }
}
这将创建一个名为 uniqueEvent
的字段,其值类似于 Lclick_filename
,然后您将在 elapsed
和 aggregate
过滤器中使用这个新字段。
如果不同文件夹中的文件名相同,则需要使用路径中的另一个字段,直到 uniqueEvent
的值成为唯一值。