如何使用 FileBeat 将管道分隔格式的日志数据以 JSON 格式发送到 Elasticsearch?

How do I use FileBeat to send log data in pipe separated format to Elasticsearch in JSON format?

我正在监视的日志文件具有以下格式的日志:

Oct 23 16:06:44 server smbd_audit: user01|192.168.0.23|project|opendir|ok|.
Oct 23 16:06:44 server smbd_audit: user01|192.168.0.23|project|closedir|ok|
Oct 23 16:06:44 server smbd_audit: user01|192.168.0.23|project|open|ok|r|file.txt
Oct 23 16:06:44 server smbd_audit: user01|192.168.0.23|project|pread|ok|file.txt
Oct 23 16:06:44 server smbd_audit: user01|192.168.0.23|project|close|ok|file.txt

在使用 FileBeat 将数据发送到 Elasticsearch 之前,我如何格式化这些数据?

我希望我的文档如下所示(不包括 elasticsearch 元数据字段):

{
  "timestamp": "Oct 23 16:06:44",
  "machine-name": "server",
  "type": "smbd_audit",
  "username": "user01",
  "machine-ip": "192.168.0.23",
  "directory": "project",
  "operation": "opendir",
  "success": "ok",
  "file": "file.txt"
}

我假设您不想使用 Logstash,因此您可以使用 ingest pipeline with Grok

put _ingest/my-pipeline    
{
  "description": "My Ingest Pipeline",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{SYSLOGTIMESTAMP:log_date} %{WORD:machine-name} %{WORD:type}: %{WORD:username}|{IP:machine-ip}|{WORD:directory}|{WORD:operation}|{WORD:success}|{WORD:file}"
        ]
      }
    },
    {
      "date": {
        "field": "log_date"
      }
    }
  ]
}

完全未经测试,但至少应该给你一些继续下去的东西。