Logstash 输出不同的字段到不同的弹性搜索索引
Logstash output different fields to different elastic search indices
我有一个 Filebeat
实例将 Apache
访问日志发送到 Logstash
。
Logstash
管道转换文件并将处理过的字段(field1、field2 和 field3)加载到 elastic search
到索引 indexA.流程简单且有效。这是我的 pipeline.conf
input{
beats{
port => "5043"
}
}
filter
{
grok
{
patterns_dir => ["/usr/share/logstash/patterns"]
match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
"%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[time]}\] \"-\" %{NUMBER:[response_code]} -" ]
}
remove_field => "@version"
remove_field => "beat"
remove_field => "input_type"
remove_field => "source"
remove_field => "type"
remove_field => "tags"
remove_field => "http_version"
remove_field => "@timestamp"
remove_field => "message"
}
mutate
{
add_field => { "field1" => "%{access_time}" }
add_field => { "field2" => "%{host}" }
add_field => { "field3" => "%{read_timestamp}" }
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexA"
}
}
现在我要做的是添加另外三个字段 field4 和 field5 并将它们添加到名为 [=25 的单独索引中=]indexB。所以最后 indexA 持有 field1 field2 和 field3 而 IndexB 持有 field4 和 field5
到目前为止,这是修改后的 pipeline.conf,似乎不起作用。
input{
beats{
port => "5043"
}
}
filter
{
grok
{
patterns_dir => ["/usr/share/logstash/patterns"]
match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
"%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[time]}\] \"-\" %{NUMBER:[response_code]} -" ]
}
remove_field => "@version"
remove_field => "beat"
remove_field => "input_type"
remove_field => "type"
remove_field => "http_version"
remove_field => "@timestamp"
remove_field => "message"
}
mutate
{
add_field => { "field1" => "%{access_time}" }
add_field => { "field2" => "%{host}" }
add_field => { "field3" => "%{read_timestamp}" }
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexA"
}
}
filter
{
mutate
{
add_field => { "field4" => "%{source}" }
add_field => { "field5" => "%{tags}" }
remove_field => "field1"
remove_field => "field2"
remove_field => "field3"
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexB"
}
}
有人可以指出我哪里出错了或解决方案的任何替代方法。
您需要使用 clone
filter 复制您的活动。然后您可以将所需的字段添加到每个相应的事件并将它们放入两个不同的 ES 索引中:
input{
beats{
port => "5043"
}
}
filter
{
grok
{
patterns_dir => ["/usr/share/logstash/patterns"]
match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
"%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[time]}\] \"-\" %{NUMBER:[response_code]} -" ]
}
remove_field => "@version"
remove_field => "beat"
remove_field => "input_type"
remove_field => "type"
remove_field => "http_version"
remove_field => "@timestamp"
remove_field => "message"
}
clone {
clones => ["log1", "log2"]
}
if [type] == "log1" {
mutate
{
add_field => { "field1" => "%{access_time}" }
add_field => { "field2" => "%{host}" }
add_field => { "field3" => "%{read_timestamp}" }
}
} else {
mutate
{
add_field => { "field4" => "%{source}" }
add_field => { "field5" => "%{tags}" }
}
}
}
output {
if [type] == "log1" {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexA"
}
} else {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexB"
}
}
}
我有一个 Filebeat
实例将 Apache
访问日志发送到 Logstash
。
Logstash
管道转换文件并将处理过的字段(field1、field2 和 field3)加载到 elastic search
到索引 indexA.流程简单且有效。这是我的 pipeline.conf
input{
beats{
port => "5043"
}
}
filter
{
grok
{
patterns_dir => ["/usr/share/logstash/patterns"]
match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
"%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[time]}\] \"-\" %{NUMBER:[response_code]} -" ]
}
remove_field => "@version"
remove_field => "beat"
remove_field => "input_type"
remove_field => "source"
remove_field => "type"
remove_field => "tags"
remove_field => "http_version"
remove_field => "@timestamp"
remove_field => "message"
}
mutate
{
add_field => { "field1" => "%{access_time}" }
add_field => { "field2" => "%{host}" }
add_field => { "field3" => "%{read_timestamp}" }
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexA"
}
}
现在我要做的是添加另外三个字段 field4 和 field5 并将它们添加到名为 [=25 的单独索引中=]indexB。所以最后 indexA 持有 field1 field2 和 field3 而 IndexB 持有 field4 和 field5
到目前为止,这是修改后的 pipeline.conf,似乎不起作用。
input{
beats{
port => "5043"
}
}
filter
{
grok
{
patterns_dir => ["/usr/share/logstash/patterns"]
match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
"%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[time]}\] \"-\" %{NUMBER:[response_code]} -" ]
}
remove_field => "@version"
remove_field => "beat"
remove_field => "input_type"
remove_field => "type"
remove_field => "http_version"
remove_field => "@timestamp"
remove_field => "message"
}
mutate
{
add_field => { "field1" => "%{access_time}" }
add_field => { "field2" => "%{host}" }
add_field => { "field3" => "%{read_timestamp}" }
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexA"
}
}
filter
{
mutate
{
add_field => { "field4" => "%{source}" }
add_field => { "field5" => "%{tags}" }
remove_field => "field1"
remove_field => "field2"
remove_field => "field3"
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexB"
}
}
有人可以指出我哪里出错了或解决方案的任何替代方法。
您需要使用 clone
filter 复制您的活动。然后您可以将所需的字段添加到每个相应的事件并将它们放入两个不同的 ES 索引中:
input{
beats{
port => "5043"
}
}
filter
{
grok
{
patterns_dir => ["/usr/share/logstash/patterns"]
match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
"%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[time]}\] \"-\" %{NUMBER:[response_code]} -" ]
}
remove_field => "@version"
remove_field => "beat"
remove_field => "input_type"
remove_field => "type"
remove_field => "http_version"
remove_field => "@timestamp"
remove_field => "message"
}
clone {
clones => ["log1", "log2"]
}
if [type] == "log1" {
mutate
{
add_field => { "field1" => "%{access_time}" }
add_field => { "field2" => "%{host}" }
add_field => { "field3" => "%{read_timestamp}" }
}
} else {
mutate
{
add_field => { "field4" => "%{source}" }
add_field => { "field5" => "%{tags}" }
}
}
}
output {
if [type] == "log1" {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexA"
}
} else {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexB"
}
}
}