具有多个kafka主题输入的logstash
logstash with multiple kafka topic input
我想使用以下设置启动 logstash 实例:
input {
kafka {
topic_id => "topic_a"
.......
}
kafka {
topic_id => "topic_b"
.......
}
}
filter {
json {
source => "message"
}
uuid {
target => "@uuid"
}
mutate {
replace => { "message" => "%{message}" } # want to get the full json literal but does not work
add_field => {
"topic" => "%{topic_id}" # it does not work either
}
}
# logic to apply different filter base on topic_id
if [topic_id] =~ 'topic_a' { # this block seems never entered
mutate {
replace => { "topic" => "topic_a" }
}
} else {
.....
}
}
output {
.....
}
我的 Kibana 上的输出应该如下所示:
topic : %{topic_id}
提示上面的配置无法提取topic_id。
我不知道如何配置过滤器部分。任何人都可以对此给出提示吗?谢谢
顺便说一句,我正在使用 logstash-2.2.2
编辑:根据logstash文档更新配置,结果还是一样
add_field 的 documentation 显示的语法与您正在使用的语法不同。你可以试试。
filter {
mutate {
add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
}
}
默认情况下,Kafka 输入插件不包含元数据信息,例如:topic_id ..
您必须启用 decorate_events
选项:
kafka {
topic_id => "topic_a"
decorate_events => true
}
完成后,您可以在 kafka
数组中找到您的 topic_id topic
键。
decorate_events
Value type is boolean Default value is false Option to add Kafka
metadata like topic, message size to the event. This will add a field
named kafka to the logstash event containing the following attributes:
msg_size: The complete serialized size of this message in bytes
(including crc, header attributes, etc) topic: The topic this message
is associated with consumer_group: The consumer group used to read in
this event partition: The partition this message is associated with
key: A ByteBuffer containing the message key
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events
我想使用以下设置启动 logstash 实例:
input {
kafka {
topic_id => "topic_a"
.......
}
kafka {
topic_id => "topic_b"
.......
}
}
filter {
json {
source => "message"
}
uuid {
target => "@uuid"
}
mutate {
replace => { "message" => "%{message}" } # want to get the full json literal but does not work
add_field => {
"topic" => "%{topic_id}" # it does not work either
}
}
# logic to apply different filter base on topic_id
if [topic_id] =~ 'topic_a' { # this block seems never entered
mutate {
replace => { "topic" => "topic_a" }
}
} else {
.....
}
}
output {
.....
}
我的 Kibana 上的输出应该如下所示:
topic : %{topic_id}
提示上面的配置无法提取topic_id。 我不知道如何配置过滤器部分。任何人都可以对此给出提示吗?谢谢
顺便说一句,我正在使用 logstash-2.2.2
编辑:根据logstash文档更新配置,结果还是一样
add_field 的 documentation 显示的语法与您正在使用的语法不同。你可以试试。
filter {
mutate {
add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
}
}
默认情况下,Kafka 输入插件不包含元数据信息,例如:topic_id ..
您必须启用 decorate_events
选项:
kafka {
topic_id => "topic_a"
decorate_events => true
}
完成后,您可以在 kafka
数组中找到您的 topic_id topic
键。
decorate_events
Value type is boolean Default value is false Option to add Kafka metadata like topic, message size to the event. This will add a field named kafka to the logstash event containing the following attributes: msg_size: The complete serialized size of this message in bytes (including crc, header attributes, etc) topic: The topic this message is associated with consumer_group: The consumer group used to read in this event partition: The partition this message is associated with key: A ByteBuffer containing the message key https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events