尝试通过 Logstash 传递二进制文件
Trying to pass binary files through Logstash
一些进程正在生成我的 Kafka 二进制文件(来自 Java 它以 bytearray 的形式出现)。
我正在尝试使用 Logstash 从 Kafka 消费并将文件上传到 s3。
我的管道:
input {
kafka {
bootstrap_servers => "my-broker:9092"
topic => "my-topic"
partition_assignment_strategy => "org.apache.kafka.clients.consumer.StickyAssignor"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
filter {
mutate {
remove_field => ["@timestamp", "host"]
}
}
output {
s3 {
region => "eu-west-1"
bucket => "my_bucket"
time_file => 1
prefix => "files/"
rotation_strategy => "time"
}
}
如您所见,我使用了不同的解串器 class。但是,Logstash 似乎默认使用将字节数组转换为字符串的编码。我的目标是按原样将文件上传到 s3。是否有任何已知的编解码器不对输入数据做任何事情并按原样上传?
现在文件已上传到s3,但我无法读取或打开它们。二进制内容被 Logstash 以某种方式破坏了。例如 - 我尝试发送一个包含多个文件的 gzip,但之后我无法在 s3 中打开它。
我在 Logstash 上收到的警告:
0-06-02T10:49:29,149][WARN ][logstash.codecs.plain ][my_pipeline] Received an event that has a different character encoding than you configured. {:text=>"7z\xBC\xAF'\u001C\u0000\u0002\xA6j<........more binary data", :expected_charset=>"UTF-8"}
我认为您并不真正了解 logstash 的用途。
顾名思义,log-stash 它用于 streaming ascii 类型的文件,使用 EOL 定界符在不同的日志事件之间进行延迟。
我确实找到了社区开发的用于从 Kafka 主题读取数据的 kafkaBeat,有 2 个选项:
- kafkabeat - 从 Kafka 主题读取数据。
- kafkabeat2 - 从 Kafka 主题读取数据(json 或纯文本)。
我没有测试自己的那些,但对那些使用 S3 输出选项可能会成功。如果尚不支持 S3 选项,您可以自行开发并将其开源,以便每个人都可以享受它:-)
我不确定 Logstash 是否最适合传递二进制数据,我最终实现了一个 Java 消费者,但以下解决方案对我适用于 Logstash:
- 发送到Kafka的数据可以序列化为二进制数据。为了
例如,我使用 filebeat 发送二进制数据,所以如果 Kafka 的
输出模块有一个名为 "value_serializer" 的参数
它应该设置为
"org.apache.kafka.common.serialization.ByteArraySerializer"
- 在您的 Logstash 设置(kafka 输入)中定义
value_deserializer_class 至
"org.apache.kafka.common.serialization.ByteArrayDeserializer"刚刚
正如我在 post
中所做的那样
- 您在 logstash 中的输出可以是任何可以获取二进制数据的资源。
请注意,输出将获得二进制数据,您需要对其进行反序列化。
一些进程正在生成我的 Kafka 二进制文件(来自 Java 它以 bytearray 的形式出现)。
我正在尝试使用 Logstash 从 Kafka 消费并将文件上传到 s3。
我的管道:
input {
kafka {
bootstrap_servers => "my-broker:9092"
topic => "my-topic"
partition_assignment_strategy => "org.apache.kafka.clients.consumer.StickyAssignor"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
filter {
mutate {
remove_field => ["@timestamp", "host"]
}
}
output {
s3 {
region => "eu-west-1"
bucket => "my_bucket"
time_file => 1
prefix => "files/"
rotation_strategy => "time"
}
}
如您所见,我使用了不同的解串器 class。但是,Logstash 似乎默认使用将字节数组转换为字符串的编码。我的目标是按原样将文件上传到 s3。是否有任何已知的编解码器不对输入数据做任何事情并按原样上传?
现在文件已上传到s3,但我无法读取或打开它们。二进制内容被 Logstash 以某种方式破坏了。例如 - 我尝试发送一个包含多个文件的 gzip,但之后我无法在 s3 中打开它。
我在 Logstash 上收到的警告:
0-06-02T10:49:29,149][WARN ][logstash.codecs.plain ][my_pipeline] Received an event that has a different character encoding than you configured. {:text=>"7z\xBC\xAF'\u001C\u0000\u0002\xA6j<........more binary data", :expected_charset=>"UTF-8"}
我认为您并不真正了解 logstash 的用途。
顾名思义,log-stash 它用于 streaming ascii 类型的文件,使用 EOL 定界符在不同的日志事件之间进行延迟。
我确实找到了社区开发的用于从 Kafka 主题读取数据的 kafkaBeat,有 2 个选项:
- kafkabeat - 从 Kafka 主题读取数据。
- kafkabeat2 - 从 Kafka 主题读取数据(json 或纯文本)。
我没有测试自己的那些,但对那些使用 S3 输出选项可能会成功。如果尚不支持 S3 选项,您可以自行开发并将其开源,以便每个人都可以享受它:-)
我不确定 Logstash 是否最适合传递二进制数据,我最终实现了一个 Java 消费者,但以下解决方案对我适用于 Logstash:
- 发送到Kafka的数据可以序列化为二进制数据。为了 例如,我使用 filebeat 发送二进制数据,所以如果 Kafka 的 输出模块有一个名为 "value_serializer" 的参数 它应该设置为 "org.apache.kafka.common.serialization.ByteArraySerializer"
- 在您的 Logstash 设置(kafka 输入)中定义 value_deserializer_class 至 "org.apache.kafka.common.serialization.ByteArrayDeserializer"刚刚 正如我在 post 中所做的那样
- 您在 logstash 中的输出可以是任何可以获取二进制数据的资源。
请注意,输出将获得二进制数据,您需要对其进行反序列化。