Logstash - csv 输出 headers
Logstash - csv output headers
我正在尝试使用 logstash jdbc plugins and returns a csv output file with headers with logstash csv plugin 请求数据库。
我花了很多时间在 logstash 文档上,但我仍然遗漏了一点。
使用以下 logstash 配置,结果为我提供了一个文件,每行包含 headers。我找不到只为 logstash 配置中的第一行添加 headers 的方法。
非常感谢帮助。
输出文件
_object$id;_object$name;_object$type;nb_surveys;csat_score
2;Jeff Karas;Agent;2;2
_object$id;_object$name;_object$type;nb_surveys;csat_score
3;John Lafer;Agent;2;2;2;2;;2
_object$id;_object$name;_object$type;nb_surveys;csat_score
4;Michele Fisher;Agent;2;2
_object$id;_object$name;_object$type;nb_surveys;csat_score
5;Chad Hendren;Agent;2;78
文件:simple-out.conf
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_user => "postgres"
jdbc_password => "postgres"
jdbc_driver_library => "/tmp/drivers/postgresql/postgresql_jdbc.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement_filepath => "query.sql"
}
}
output {
csv {
fields => ["_object$id","_object$name","_object$type","nb_surveys","csat_score"]
path => "output/%{team}/output-%{team}.%{+yyyy.MM.dd}.csv"
csv_options => {
"write_headers" => true
"headers" =>["_object$id","_object$name","_object$type","nb_surveys","csat_score"]
"col_sep" => ";"
}
}
}
谢谢
你在输出中得到多个 headers 的原因是因为 Logstash 没有事件之间 global/shared 状态的概念,每个项目都是单独处理的,所以每次 CSV 输出插件 运行s 它的行为类似于第一个并写入 headers.
我遇到了同样的问题,并找到了一个解决方案,使用 ruby 过滤器的 init 选项在 logstash startup-time.
中执行一些代码
这是一个示例 logstash 配置:
# csv-headers.conf
input {
stdin {}
}
filter {
ruby {
init => "
begin
@@csv_file = 'output.csv'
@@csv_headers = ['A','B','C']
if File.zero?(@@csv_file) || !File.exist?(@@csv_file)
CSV.open(@@csv_file, 'w') do |csv|
csv << @@csv_headers
end
end
end
"
code => "
begin
event['@metadata']['csv_file'] = @@csv_file
event['@metadata']['csv_headers'] = @@csv_headers
end
"
}
csv {
columns => ["a", "b", "c"]
}
}
output {
csv {
fields => ["a", "b", "c"]
path => "%{[@metadata][csv_file]}"
}
stdout {
codec => rubydebug {
metadata => true
}
}
}
如果您 运行 使用该配置的 Logstash:
echo "1,2,3\n4,5,6\n7,8,9" | ./bin/logstash -f csv-headers.conf
您将得到一个包含以下内容的 output.csv
文件:
A,B,C
1,2,3
4,5,6
7,8,9
这也是 thread-safe,因为它 运行 只是启动时的代码,因此您可以使用多个 worker。
希望对您有所帮助!
我正在使用利用事件日期的动态文件名 (index-YYYY-MM-DD.csv),因此在管道启动时写入 headers 对我来说不是一个可行的选择。
相反,我允许写入重复的 headers 并每隔几分钟设置一个 运行 的 cron 作业并删除所有重复的行并将结果写回同一个文件。
#!/bin/bash -xe
for filename in /tmp/logstash/*.csv; do awk '!v[]++' $filename > $filename.tmp && mv -f $filename.tmp $filename; done
注意:这仅在我提取几百 MB 数据的实例上进行了测试 - 如果您的数据管道每分钟摄取 GB,这可能不是一个可行的选择。
我正在尝试使用 logstash jdbc plugins and returns a csv output file with headers with logstash csv plugin 请求数据库。
我花了很多时间在 logstash 文档上,但我仍然遗漏了一点。
使用以下 logstash 配置,结果为我提供了一个文件,每行包含 headers。我找不到只为 logstash 配置中的第一行添加 headers 的方法。
非常感谢帮助。
输出文件
_object$id;_object$name;_object$type;nb_surveys;csat_score
2;Jeff Karas;Agent;2;2
_object$id;_object$name;_object$type;nb_surveys;csat_score
3;John Lafer;Agent;2;2;2;2;;2
_object$id;_object$name;_object$type;nb_surveys;csat_score
4;Michele Fisher;Agent;2;2
_object$id;_object$name;_object$type;nb_surveys;csat_score
5;Chad Hendren;Agent;2;78
文件:simple-out.conf
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_user => "postgres"
jdbc_password => "postgres"
jdbc_driver_library => "/tmp/drivers/postgresql/postgresql_jdbc.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement_filepath => "query.sql"
}
}
output {
csv {
fields => ["_object$id","_object$name","_object$type","nb_surveys","csat_score"]
path => "output/%{team}/output-%{team}.%{+yyyy.MM.dd}.csv"
csv_options => {
"write_headers" => true
"headers" =>["_object$id","_object$name","_object$type","nb_surveys","csat_score"]
"col_sep" => ";"
}
}
}
谢谢
你在输出中得到多个 headers 的原因是因为 Logstash 没有事件之间 global/shared 状态的概念,每个项目都是单独处理的,所以每次 CSV 输出插件 运行s 它的行为类似于第一个并写入 headers.
我遇到了同样的问题,并找到了一个解决方案,使用 ruby 过滤器的 init 选项在 logstash startup-time.
中执行一些代码这是一个示例 logstash 配置:
# csv-headers.conf
input {
stdin {}
}
filter {
ruby {
init => "
begin
@@csv_file = 'output.csv'
@@csv_headers = ['A','B','C']
if File.zero?(@@csv_file) || !File.exist?(@@csv_file)
CSV.open(@@csv_file, 'w') do |csv|
csv << @@csv_headers
end
end
end
"
code => "
begin
event['@metadata']['csv_file'] = @@csv_file
event['@metadata']['csv_headers'] = @@csv_headers
end
"
}
csv {
columns => ["a", "b", "c"]
}
}
output {
csv {
fields => ["a", "b", "c"]
path => "%{[@metadata][csv_file]}"
}
stdout {
codec => rubydebug {
metadata => true
}
}
}
如果您 运行 使用该配置的 Logstash:
echo "1,2,3\n4,5,6\n7,8,9" | ./bin/logstash -f csv-headers.conf
您将得到一个包含以下内容的 output.csv
文件:
A,B,C
1,2,3
4,5,6
7,8,9
这也是 thread-safe,因为它 运行 只是启动时的代码,因此您可以使用多个 worker。
希望对您有所帮助!
我正在使用利用事件日期的动态文件名 (index-YYYY-MM-DD.csv),因此在管道启动时写入 headers 对我来说不是一个可行的选择。
相反,我允许写入重复的 headers 并每隔几分钟设置一个 运行 的 cron 作业并删除所有重复的行并将结果写回同一个文件。
#!/bin/bash -xe
for filename in /tmp/logstash/*.csv; do awk '!v[]++' $filename > $filename.tmp && mv -f $filename.tmp $filename; done
注意:这仅在我提取几百 MB 数据的实例上进行了测试 - 如果您的数据管道每分钟摄取 GB,这可能不是一个可行的选择。