尝试通过 Logstash 传递二进制文件

Trying to pass binary files through Logstash

一些进程正在生成我的 Kafka 二进制文件(来自 Java 它以 bytearray 的形式出现)。

我正在尝试使用 Logstash 从 Kafka 消费并将文件上传到 s3。

我的管道:

input {
  kafka {
    bootstrap_servers => "my-broker:9092"
    topic => "my-topic"
    partition_assignment_strategy => "org.apache.kafka.clients.consumer.StickyAssignor"
    value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
  }
}
filter {
  mutate {
    remove_field => ["@timestamp", "host"]
  }
}
output {
  s3 {
    region => "eu-west-1"
    bucket => "my_bucket"
    time_file => 1
    prefix => "files/"
    rotation_strategy => "time"
  }
}

如您所见,我使用了不同的解串器 class。但是,Logstash 似乎默认使用将字节数组转换为字符串的编码。我的目标是按原样将文件上传到 s3。是否有任何已知的编解码器不对输入数据做任何事情并按原样上传?

现在文件已上传到s3,但我无法读取或打开它们。二进制内容被 Logstash 以某种方式破坏了。例如 - 我尝试发送一个包含多个文件的 gzip,但之后我无法在 s3 中打开它。

我在 Logstash 上收到的警告:

0-06-02T10:49:29,149][WARN ][logstash.codecs.plain    ][my_pipeline] Received an event that has a different character encoding than you configured. {:text=>"7z\xBC\xAF'\u001C\u0000\u0002\xA6j<........more binary data", :expected_charset=>"UTF-8"}

我认为您并不真正了解 logstash 的用途。

顾名思义,log-stash 它用于 streaming ascii 类型的文件,使用 EOL 定界符在不同的日志事件之间进行延迟。

我确实找到了社区开发的用于从 Kafka 主题读取数据的 kafkaBeat,有 2 个选项:

  • kafkabeat - 从 Kafka 主题读取数据。
  • kafkabeat2 - 从 Kafka 主题读取数据(json 或纯文本)。

我没有测试自己的那些,但对那些使用 S3 输出选项可能会成功。如果尚不支持 S3 选项,您可以自行开发并将其开源,以便每个人都可以享受它:-)

我不确定 Logstash 是否最适合传递二进制数据,我最终实现了一个 Java 消费者,但以下解决方案对我适用于 Logstash:

  1. 发送到Kafka的数据可以序列化为二进制数据。为了 例如,我使用 filebeat 发送二进制数据,所以如果 Kafka 的 输出模块有一个名为 "value_serializer" 的参数 它应该设置为 "org.apache.kafka.common.serialization.ByteArraySerializer"
  2. 在您的 Logstash 设置(kafka 输入)中定义 value_deserializer_class 至 "org.apache.kafka.common.serialization.ByteArrayDeserializer"刚刚 正如我在 post
  3. 中所做的那样
  4. 您在 logstash 中的输出可以是任何可以获取二进制数据的资源。

请注意,输出将获得二进制数据,您需要对其进行反序列化。