Oracle RDS XML 审计日志的 Logstash Grok 过滤器模式

Logstash Grok filter pattern for Oracle RDS XML Audit Logs

我想创建一个 logstash grok 模式来解析下面的 oracle 审计日志并仅从 "<AuditRecord> to </AuditRecord>"

中提取值
{"messageType":"DATA_MESSAGE","owner":"656565656566","logGroup":"/aws/rds/instance/stg/audit","logStream":"STG_ora_20067_20210906120520144010741320.xml","subscriptionFilters":["All logs"],"logEvents":[{"id":"36370952585791240628335082776414249187626811417307774976","timestamp":1630929920144,"message":<AuditRecord><Audit_Type>8</Audit_Type><EntryId>1</EntryId><Extended_Timestamp>2021-08-31T13:25:20.140969Z</Extended_Timestamp><DB_User>/</DB_User><OS_User>rdsdb</OS_User><Userhost>ip-172-27-1-72</Userhost><OS_Process>6773</OS_Process><Instance_Number>0</Instance_Number><Returncode>0</Returncode><OSPrivilege>SYSDBA</OSPrivilege><DBID>918393906</DBID> <Sql_Text>CONNECT</Sql_Text> </AuditRecord>"}]}

这些日志以 s3 和 gz 格式存储。我正在为 Logstash 使用以下配置,但它不起作用。

input {
    s3 {
        bucket => "s3bucket"
        type => "oracle-audit-log-xml"
        region => "eu-west-1"

   }
}

filter {
 ## For Oracle audit log

 if [type] == "oracle-audit-log-xml" {

        mutate { gsub => [ "message", "[\n]", "" ] }

        grok {
                match => [ "message","<AuditRecord>%{DATA:temp_audit_message}</AuditRecord>" ]
        }
        mutate {
                add_field => { "audit_message" => "<AuditRecord>%{temp_audit_message}</AuditRecord>" }
        }
        xml {
                store_xml => true
                source => "audit_message"
                target => "audit"
        }
        mutate {
                add_field => { "timestamp" => "%{[audit][Extended_Timestamp]}" }
        }
        date {
      match => [ "timestamp","yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'","ISO8601" ]
      target => "@timestamp"
    }
        # remove temporary fields
        mutate { remove_field => ["message", "audit_message", "temp_audit_message"] }
        
 if "_grokparsefailure" in [tags] {
  drop{}
}
 }
 }

output {
  amazon_es {
      hosts => ["elasticsearch url"]
      index => "rdslogs-%{+YYYY.MM.dd}"
      region => "eu-west-1"
      aws_access_key_id => ''
      aws_secret_access_key => ''
}
}

下面这行似乎有问题

{"messageType":"DATA_MESSAGE","owner":"656565656566","logGroup":"/aws/rds/instance/stg/audit","logStream":"STG_ora_20067_20210906120520144010741320.xml","subscriptionFilters":["All logs"],"logEvents":[{"id":"36370952585791240628335082776414249187626811417307774976","timestamp":1630929920144,"message":

有什么方法可以修改它以删除上面的行。

谢谢

您不需要 grok 模式,因为您的日志是 JSON 格式。安装 logstash json 过滤器插件。

$ logstash-plugin install logstash-filter-json

并在下面添加过滤器设置以解析您的日志。

filter{
       json {
                source => "message" 
            }
}

可以查看我本地 ELK 设置的附加屏幕截图。已尝试解析您提供的日志行。