通过 Serilog UDP 接收器和 logstash 的 AWS Lambda 日志记录静默失败

AWS Lambda logging through Serilog UDP sink and logstash silently fails

我们有一个 .NET Core 2.1 AWS Lambda,我正试图将其连接到我们现有的日志系统中。

我正在尝试使用 UDP 接收器通过 Serilog 登录到我们的 logstash 实例,以便摄取到托管在私有 VPC 上的 ElasticSearch 日志数据库。 运行 通过控制台在本地记录良好,既可以记录到控制台本身,也可以通过 UDP 记录到 Elastic。但是,当它作为 lambda 运行时,它只会记录到控制台(即 CloudWatch),并且不会输出任何表明任何错误的信息。可能是因为 UDP 是无状态的?

NuGet 包和版本:

这是我们使用的日志记录代码:

        public static void Configure(string udpHost, int udpPort, string environment)
        {
            var udpFormatter = new JsonFormatter(renderMessage: true);

            var loggerConfig = new LoggerConfiguration()
                .Enrich.FromLogContext()
                .MinimumLevel.Information()
                .Enrich.WithProperty("applicationName", Assembly.GetExecutingAssembly().GetName().Name)
                .Enrich.WithProperty("applicationVersion", Assembly.GetExecutingAssembly().GetName().Version.ToString())
                .Enrich.WithProperty("tags", environment);

                loggerConfig
                    .WriteTo.Console(outputTemplate: "[{Level:u}]: {Message}{N---ewLine}{Exception}")
                    .WriteTo.Udp(udpHost, udpPort, udpFormatter);

              var logger = loggerConfig.CreateLogger();
              Serilog.Log.Logger = logger;
              Serilog.Debugging.SelfLog.Enable(Console.Error);
        }
          // this is output in the console from the lambda, but doesn't appear in the Database from the lambda
          // when run locally, appears in both
          Serilog.Log.Logger.Information("Hello from Serilog!");
          ...
          // at end of lambda
          Serilog.Log.CloseAndFlush();

这是我们在 logstash 上的 UDP 输入:

     udp {                                                                                                                                                                                                                                    
         port => 5000
         tags => [ 'systest', 'serilog-nested' ]                                                                                                                                                                                              
         codec => json                                                                                                                                                                                                                        
     }                                                                                                                                                                                                                                        

有谁知道我该如何解决这个问题?或者甚至只是看看具体有什么问题,这样我就可以开始寻找解决方案。

到目前为止尝试过的事情包括:

根据我上面的评论,您可以像这样创建一个日志订阅并流式传输到 ES,我知道这是 NodeJS,所以它不是正确的答案,但您可以从这里找出答案:

/* eslint-disable */
// Eslint disabled as this is adapted AWS code.

const zlib = require('zlib')
const { Client } = require('@elastic/elasticsearch')
const elasticsearch = new Client({ ES_CLUSTER_DETAILS })
/**
 * This is an example function to stream CloudWatch logs to ElasticSearch.
 * @param event
 * @param context
 * @param callback
 */
export default (event, context, callback) => {
    context.callbackWaitsForEmptyEventLoop = true

    const payload = new Buffer(event.awslogs.data, 'base64')

    zlib.gunzip(payload, (err, result) => {

        if (err) {
            return callback(null, err)
        }

        const logObject = JSON.parse(result.toString('utf8'))

        const elasticsearchBulkData = transform(logObject)

        const params = { body: [] }
        params.body.push(elasticsearchBulkData)

        esClient.bulk(params, (err, resp) => {

            if (err) {
        callback(null, 'success')
        return
    }

        })

        callback(null, 'success')
    })
}

function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null
    }

    let bulkRequestBody = ''

    payload.logEvents.forEach((logEvent) => {
        const timestamp = new Date(1 * logEvent.timestamp)

        // index name format: cwl-YYYY.MM.DD
        const indexName = [
            `cwl-${process.env.NODE_ENV}-${timestamp.getUTCFullYear()}`,              // year
            (`0${timestamp.getUTCMonth() + 1}`).slice(-2),  // month
            (`0${timestamp.getUTCDate()}`).slice(-2),          // day
        ].join('.')

        const source = buildSource(logEvent.message, logEvent.extractedFields)
        source['@id'] = logEvent.id
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString()
        source['@message'] = logEvent.message
        source['@owner'] = payload.owner
        source['@log_group'] = payload.logGroup
        source['@log_stream'] = payload.logStream

        const action = { index: {} }
        action.index._index = indexName
        action.index._type = 'lambdaLogs'
        action.index._id = logEvent.id

        bulkRequestBody += `${[
            JSON.stringify(action),
            JSON.stringify(source),
        ].join('\n')}\n`
    })
    return bulkRequestBody
}

function buildSource(message, extractedFields) {
    if (extractedFields) {
        const source = {}

        for (const key in extractedFields) {
            if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
                const value = extractedFields[key]

                if (isNumeric(value)) {
                    source[key] = 1 * value
                    continue
                }

                const jsonSubString = extractJson(value)
                if (jsonSubString !== null) {
                    source[`$${key}`] = JSON.parse(jsonSubString)
                }

                source[key] = value
            }
        }
        return source
    }

    const jsonSubString = extractJson(message)
    if (jsonSubString !== null) {
        return JSON.parse(jsonSubString)
    }

    return {}
}

function extractJson(message) {
    const jsonStart = message.indexOf('{')
    if (jsonStart < 0) return null
    const jsonSubString = message.substring(jsonStart)
    return isValidJson(jsonSubString) ? jsonSubString : null
}

function isValidJson(message) {
    try {
        JSON.parse(message)
    } catch (e) { return false }
    return true
}

function isNumeric(n) {
    return !isNaN(parseFloat(n)) && isFinite(n)
}

我的一位同事帮助我完成了大部分工作,然后我设法弄清楚了最后一点。

  • 我更新了 Serilog.Sinks.Udp 到 6.0.0
  • 我更新了 UDP 设置代码以使用 AddressFamily.InterNetwork 说明符,我认为它在 5.0.1 中不可用。
  • 我删除了使用 "tags" 丰富我们的日志消息,因为我相信它出现在 UDP 端点上以某种方式导致了某种冲突,我之前看到它停止记录而没有任何痕迹。

瞧!

这是新的日志设置代码:

loggerConfig
    .WriteTo.Udp(udpHost, udpPort, AddressFamily.InterNetwork, udpFormatter)
    .WriteTo.Console(outputTemplate: "[{Level:u}]: {Message}{NewLine}{Exception}");