通过 Serilog UDP 接收器和 logstash 的 AWS Lambda 日志记录静默失败
AWS Lambda logging through Serilog UDP sink and logstash silently fails
我们有一个 .NET Core 2.1 AWS Lambda,我正试图将其连接到我们现有的日志系统中。
我正在尝试使用 UDP 接收器通过 Serilog 登录到我们的 logstash 实例,以便摄取到托管在私有 VPC 上的 ElasticSearch 日志数据库。 运行 通过控制台在本地记录良好,既可以记录到控制台本身,也可以通过 UDP 记录到 Elastic。但是,当它作为 lambda 运行时,它只会记录到控制台(即 CloudWatch),并且不会输出任何表明任何错误的信息。可能是因为 UDP 是无状态的?
NuGet 包和版本:
- Serilog 2.7.1
- Serilog.Sinks.Udp 5.0.1
这是我们使用的日志记录代码:
public static void Configure(string udpHost, int udpPort, string environment)
{
var udpFormatter = new JsonFormatter(renderMessage: true);
var loggerConfig = new LoggerConfiguration()
.Enrich.FromLogContext()
.MinimumLevel.Information()
.Enrich.WithProperty("applicationName", Assembly.GetExecutingAssembly().GetName().Name)
.Enrich.WithProperty("applicationVersion", Assembly.GetExecutingAssembly().GetName().Version.ToString())
.Enrich.WithProperty("tags", environment);
loggerConfig
.WriteTo.Console(outputTemplate: "[{Level:u}]: {Message}{N---ewLine}{Exception}")
.WriteTo.Udp(udpHost, udpPort, udpFormatter);
var logger = loggerConfig.CreateLogger();
Serilog.Log.Logger = logger;
Serilog.Debugging.SelfLog.Enable(Console.Error);
}
// this is output in the console from the lambda, but doesn't appear in the Database from the lambda
// when run locally, appears in both
Serilog.Log.Logger.Information("Hello from Serilog!");
...
// at end of lambda
Serilog.Log.CloseAndFlush();
这是我们在 logstash 上的 UDP 输入:
udp {
port => 5000
tags => [ 'systest', 'serilog-nested' ]
codec => json
}
有谁知道我该如何解决这个问题?或者甚至只是看看具体有什么问题,这样我就可以开始寻找解决方案。
到目前为止尝试过的事情包括:
- 从 lambda ping logstash - 不可能,lambda 没有 ICMP
- 各种尝试让 UDP 接收器输出错误,如上所示,各种尝试。即使输入一个完全伪造的地址也不会产生错误
- 将 lambda 添加到我知道可以从中进行日志记录的 VPC
- 在 lambda 末尾睡觉。这样日志就有时间在 lambda 退出之前通过
- 正在检查 logstash 日志以查看是否有任何异常。事实并非如此。而本地运行顺利通过的事实让我认为并非如此。
- 直接使用UDP。它似乎没有到达服务器。我不确定这是连接问题还是只是来自 lambda 的 UDP 本身。
- 大量的咒骂和咒骂
根据我上面的评论,您可以像这样创建一个日志订阅并流式传输到 ES,我知道这是 NodeJS,所以它不是正确的答案,但您可以从这里找出答案:
/* eslint-disable */
// Eslint disabled as this is adapted AWS code.
const zlib = require('zlib')
const { Client } = require('@elastic/elasticsearch')
const elasticsearch = new Client({ ES_CLUSTER_DETAILS })
/**
* This is an example function to stream CloudWatch logs to ElasticSearch.
* @param event
* @param context
* @param callback
*/
export default (event, context, callback) => {
context.callbackWaitsForEmptyEventLoop = true
const payload = new Buffer(event.awslogs.data, 'base64')
zlib.gunzip(payload, (err, result) => {
if (err) {
return callback(null, err)
}
const logObject = JSON.parse(result.toString('utf8'))
const elasticsearchBulkData = transform(logObject)
const params = { body: [] }
params.body.push(elasticsearchBulkData)
esClient.bulk(params, (err, resp) => {
if (err) {
callback(null, 'success')
return
}
})
callback(null, 'success')
})
}
function transform(payload) {
if (payload.messageType === 'CONTROL_MESSAGE') {
return null
}
let bulkRequestBody = ''
payload.logEvents.forEach((logEvent) => {
const timestamp = new Date(1 * logEvent.timestamp)
// index name format: cwl-YYYY.MM.DD
const indexName = [
`cwl-${process.env.NODE_ENV}-${timestamp.getUTCFullYear()}`, // year
(`0${timestamp.getUTCMonth() + 1}`).slice(-2), // month
(`0${timestamp.getUTCDate()}`).slice(-2), // day
].join('.')
const source = buildSource(logEvent.message, logEvent.extractedFields)
source['@id'] = logEvent.id
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString()
source['@message'] = logEvent.message
source['@owner'] = payload.owner
source['@log_group'] = payload.logGroup
source['@log_stream'] = payload.logStream
const action = { index: {} }
action.index._index = indexName
action.index._type = 'lambdaLogs'
action.index._id = logEvent.id
bulkRequestBody += `${[
JSON.stringify(action),
JSON.stringify(source),
].join('\n')}\n`
})
return bulkRequestBody
}
function buildSource(message, extractedFields) {
if (extractedFields) {
const source = {}
for (const key in extractedFields) {
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
const value = extractedFields[key]
if (isNumeric(value)) {
source[key] = 1 * value
continue
}
const jsonSubString = extractJson(value)
if (jsonSubString !== null) {
source[`$${key}`] = JSON.parse(jsonSubString)
}
source[key] = value
}
}
return source
}
const jsonSubString = extractJson(message)
if (jsonSubString !== null) {
return JSON.parse(jsonSubString)
}
return {}
}
function extractJson(message) {
const jsonStart = message.indexOf('{')
if (jsonStart < 0) return null
const jsonSubString = message.substring(jsonStart)
return isValidJson(jsonSubString) ? jsonSubString : null
}
function isValidJson(message) {
try {
JSON.parse(message)
} catch (e) { return false }
return true
}
function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n)
}
我的一位同事帮助我完成了大部分工作,然后我设法弄清楚了最后一点。
- 我更新了 Serilog.Sinks.Udp 到 6.0.0
- 我更新了 UDP 设置代码以使用 AddressFamily.InterNetwork 说明符,我认为它在 5.0.1 中不可用。
- 我删除了使用 "tags" 丰富我们的日志消息,因为我相信它出现在 UDP 端点上以某种方式导致了某种冲突,我之前看到它停止记录而没有任何痕迹。
瞧!
这是新的日志设置代码:
loggerConfig
.WriteTo.Udp(udpHost, udpPort, AddressFamily.InterNetwork, udpFormatter)
.WriteTo.Console(outputTemplate: "[{Level:u}]: {Message}{NewLine}{Exception}");
我们有一个 .NET Core 2.1 AWS Lambda,我正试图将其连接到我们现有的日志系统中。
我正在尝试使用 UDP 接收器通过 Serilog 登录到我们的 logstash 实例,以便摄取到托管在私有 VPC 上的 ElasticSearch 日志数据库。 运行 通过控制台在本地记录良好,既可以记录到控制台本身,也可以通过 UDP 记录到 Elastic。但是,当它作为 lambda 运行时,它只会记录到控制台(即 CloudWatch),并且不会输出任何表明任何错误的信息。可能是因为 UDP 是无状态的?
NuGet 包和版本:
- Serilog 2.7.1
- Serilog.Sinks.Udp 5.0.1
这是我们使用的日志记录代码:
public static void Configure(string udpHost, int udpPort, string environment)
{
var udpFormatter = new JsonFormatter(renderMessage: true);
var loggerConfig = new LoggerConfiguration()
.Enrich.FromLogContext()
.MinimumLevel.Information()
.Enrich.WithProperty("applicationName", Assembly.GetExecutingAssembly().GetName().Name)
.Enrich.WithProperty("applicationVersion", Assembly.GetExecutingAssembly().GetName().Version.ToString())
.Enrich.WithProperty("tags", environment);
loggerConfig
.WriteTo.Console(outputTemplate: "[{Level:u}]: {Message}{N---ewLine}{Exception}")
.WriteTo.Udp(udpHost, udpPort, udpFormatter);
var logger = loggerConfig.CreateLogger();
Serilog.Log.Logger = logger;
Serilog.Debugging.SelfLog.Enable(Console.Error);
}
// this is output in the console from the lambda, but doesn't appear in the Database from the lambda
// when run locally, appears in both
Serilog.Log.Logger.Information("Hello from Serilog!");
...
// at end of lambda
Serilog.Log.CloseAndFlush();
这是我们在 logstash 上的 UDP 输入:
udp {
port => 5000
tags => [ 'systest', 'serilog-nested' ]
codec => json
}
有谁知道我该如何解决这个问题?或者甚至只是看看具体有什么问题,这样我就可以开始寻找解决方案。
到目前为止尝试过的事情包括:
- 从 lambda ping logstash - 不可能,lambda 没有 ICMP
- 各种尝试让 UDP 接收器输出错误,如上所示,各种尝试。即使输入一个完全伪造的地址也不会产生错误
- 将 lambda 添加到我知道可以从中进行日志记录的 VPC
- 在 lambda 末尾睡觉。这样日志就有时间在 lambda 退出之前通过
- 正在检查 logstash 日志以查看是否有任何异常。事实并非如此。而本地运行顺利通过的事实让我认为并非如此。
- 直接使用UDP。它似乎没有到达服务器。我不确定这是连接问题还是只是来自 lambda 的 UDP 本身。
- 大量的咒骂和咒骂
根据我上面的评论,您可以像这样创建一个日志订阅并流式传输到 ES,我知道这是 NodeJS,所以它不是正确的答案,但您可以从这里找出答案:
/* eslint-disable */
// Eslint disabled as this is adapted AWS code.
const zlib = require('zlib')
const { Client } = require('@elastic/elasticsearch')
const elasticsearch = new Client({ ES_CLUSTER_DETAILS })
/**
* This is an example function to stream CloudWatch logs to ElasticSearch.
* @param event
* @param context
* @param callback
*/
export default (event, context, callback) => {
context.callbackWaitsForEmptyEventLoop = true
const payload = new Buffer(event.awslogs.data, 'base64')
zlib.gunzip(payload, (err, result) => {
if (err) {
return callback(null, err)
}
const logObject = JSON.parse(result.toString('utf8'))
const elasticsearchBulkData = transform(logObject)
const params = { body: [] }
params.body.push(elasticsearchBulkData)
esClient.bulk(params, (err, resp) => {
if (err) {
callback(null, 'success')
return
}
})
callback(null, 'success')
})
}
function transform(payload) {
if (payload.messageType === 'CONTROL_MESSAGE') {
return null
}
let bulkRequestBody = ''
payload.logEvents.forEach((logEvent) => {
const timestamp = new Date(1 * logEvent.timestamp)
// index name format: cwl-YYYY.MM.DD
const indexName = [
`cwl-${process.env.NODE_ENV}-${timestamp.getUTCFullYear()}`, // year
(`0${timestamp.getUTCMonth() + 1}`).slice(-2), // month
(`0${timestamp.getUTCDate()}`).slice(-2), // day
].join('.')
const source = buildSource(logEvent.message, logEvent.extractedFields)
source['@id'] = logEvent.id
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString()
source['@message'] = logEvent.message
source['@owner'] = payload.owner
source['@log_group'] = payload.logGroup
source['@log_stream'] = payload.logStream
const action = { index: {} }
action.index._index = indexName
action.index._type = 'lambdaLogs'
action.index._id = logEvent.id
bulkRequestBody += `${[
JSON.stringify(action),
JSON.stringify(source),
].join('\n')}\n`
})
return bulkRequestBody
}
function buildSource(message, extractedFields) {
if (extractedFields) {
const source = {}
for (const key in extractedFields) {
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
const value = extractedFields[key]
if (isNumeric(value)) {
source[key] = 1 * value
continue
}
const jsonSubString = extractJson(value)
if (jsonSubString !== null) {
source[`$${key}`] = JSON.parse(jsonSubString)
}
source[key] = value
}
}
return source
}
const jsonSubString = extractJson(message)
if (jsonSubString !== null) {
return JSON.parse(jsonSubString)
}
return {}
}
function extractJson(message) {
const jsonStart = message.indexOf('{')
if (jsonStart < 0) return null
const jsonSubString = message.substring(jsonStart)
return isValidJson(jsonSubString) ? jsonSubString : null
}
function isValidJson(message) {
try {
JSON.parse(message)
} catch (e) { return false }
return true
}
function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n)
}
我的一位同事帮助我完成了大部分工作,然后我设法弄清楚了最后一点。
- 我更新了 Serilog.Sinks.Udp 到 6.0.0
- 我更新了 UDP 设置代码以使用 AddressFamily.InterNetwork 说明符,我认为它在 5.0.1 中不可用。
- 我删除了使用 "tags" 丰富我们的日志消息,因为我相信它出现在 UDP 端点上以某种方式导致了某种冲突,我之前看到它停止记录而没有任何痕迹。
瞧!
这是新的日志设置代码:
loggerConfig
.WriteTo.Udp(udpHost, udpPort, AddressFamily.InterNetwork, udpFormatter)
.WriteTo.Console(outputTemplate: "[{Level:u}]: {Message}{NewLine}{Exception}");