Terraform:InvalidParameterException:PutSubscriptionFilter 操作无法与供应商 es 的 destinationArn 一起使用

Terraform: InvalidParameterException: PutSubscriptionFilter operation cannot work with destinationArn for vendor es

我正在尝试复制 AWS 控制台功能,我只需单击一个日志组,然后选择 ES 上的流,然后选择我的托管 ES 之一。

我认为 aws_cloudwatch_log_subscription_filter 是我要找的。尽管医生说只有 Kinesis 和 Lambda,但我心想 "maybe the documentation is old/incomplete"

所以我试过了

resource "aws_cloudwatch_log_subscription_filter" "whatever" {                                                                                                                                                                      
  name            = "lambda_logs_to_es"                                                                                                                                                                           
  role_arn        = aws_iam_role.my_lamda_role.arn                                                                                                                                                                            
  log_group_name  = aws_cloudwatch_log_group.my_log_group.name                                                                                                                                                                
  filter_pattern  = ""                                                                                                                                                                                                                       
  destination_arn = "arn:aws:es:eu-west-3:585399047133:domain/my-es"                                                                                                                          
}    

毫不奇怪,它告诉我

Terraform  InvalidParameterException: PutSubscriptionFilter operation cannot work with destinationArn for vendor es

那么有没有一种简单的方法可以使用 terraform 复制此功能,而无需编写我自己的 lambda,将我的其他 lambda(lambda-ception)的日志转发到 ES? (这是我选择 AWS 的 ES 而不是自己安装的原因之一,感觉它会更好地与其他 AWS 服务集成)?

即是否已经有(如果可能由 AWS 支持)使用所述 lambda 的 lambda 和 terrafor 模块来执行此功能?

稍后我将使用更多代码编辑答案,但长话短说:

没什么神奇的,您在 AWS 控制台中单击,实际上创建了一个将日志转发到 ES 的 lambda,亚马逊管理的弹性搜索没有魔法直接 link。

所以我最后做了

#
# Create a lambda that will forward other lambda's logs
# to the VPC's elasticsearch, this whole configuration can be done
# manually though the "stream to elasticsearch" option when you
# choose where to stream a log group of cloudwatch
#

resource "aws_iam_role" "logs_to_es_lambda_role" {
  name = "logs_to_es_lambda_${local.env_type}"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "attach_basic_role_to_logs_to_es" {
  role = aws_iam_role.logs_to_es_lambda_role.id
  # this policy permits to log in cloudwatch
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

resource "aws_iam_role_policy_attachment" "access_vpc_role_to_logs_to_es" {
  role = aws_iam_role.logs_to_es_lambda_role.id
  # this policy permits the lambda to have access to the vpc (as the Elastic search
  # is only accessible from there
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
}

resource "aws_cloudwatch_log_group" "logs_to_es_log_group" {
  name              = "/aws/lambda/logs_to_es_lambda_role_${local.env_type}"
  retention_in_days = 7
}

resource "aws_security_group" "logs_to_es" {
  description = "allow all egress network for the lambda logs_to_es"
  vpc_id      = module.vpc.vpc_id
  name        = "lambda-${local.env_type}-logs-to-es"

  egress {
    protocol    = "-1"
    from_port   = 0
    to_port     = 0
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name               = "lambda-${local.env_type}-logs-to-es"
    ManagedByTerraform = "true"
    EnvironmentType    = "${local.env_type}"
  }
}


data "archive_file" "logs_to_es_zip" {
  type        = "zip"
  source_file = "lambdas/logs_to_elasticsearch/logs_to_elasticsearch.js"
  output_path = "lambdas/logs_to_elasticsearch/logs_to_elasticsearch.zip"
}

resource "aws_lambda_function" "logs_to_es" {

  filename         = data.archive_file.logs_to_es_zip.output_path
  source_code_hash = data.archive_file.logs_to_es_zip.output_base64sha256
  function_name    = "logs_to_es_${local.env_type}"

  role    = aws_iam_role.logs_to_es_lambda_role.arn
  handler = "logs_to_elasticsearch.handler"

  timeout = 100

  runtime = "nodejs10.x"

  description = "${local.env_type} script that forwards cloudwatch logs of that VPC's lambda to ES"

  vpc_config {
    subnet_ids = module.vpc.private_subnets
    security_group_ids = [
      aws_security_group.logs_to_es.id
    ]
  }

  environment {
    variables = {
      ELASTICSEARCH_ENDPOINT = aws_elasticsearch_domain.technical_logs.endpoint
    }
  }

  tags = {
    Environment        = local.env_type
    ManagedByTerraform = "true"
  }

  depends_on = [
    "aws_iam_role_policy_attachment.attach_basic_role_to_logs_to_es",
    "aws_iam_role_policy_attachment.access_vpc_role_to_logs_to_es",
    "aws_cloudwatch_log_group.logs_to_es_log_group",
  ]
}

lambda 的代码是

#
# Create a lambda that will forward other lambda's logs
# to the VPC's elasticsearch, this whole configuration can be done
# manually though the "stream to elasticsearch" option when you
# choose where to stream a log group of cloudwatch
#

resource "aws_iam_role" "logs_to_es_lambda_role" {
  name = "logs_to_es_lambda_${local.env_type}"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "attach_basic_role_to_logs_to_es" {
  role = aws_iam_role.logs_to_es_lambda_role.id
  # this policy permits to log in cloudwatch
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

resource "aws_iam_role_policy_attachment" "access_vpc_role_to_logs_to_es" {
  role = aws_iam_role.logs_to_es_lambda_role.id
  # this policy permits the lambda to have access to the vpc (as the Elastic search
  # is only accessible from there
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
}

resource "aws_cloudwatch_log_group" "logs_to_es_log_group" {
  name              = "/aws/lambda/logs_to_es_lambda_role_${local.env_type}"
  retention_in_days = 7
}

resource "aws_security_group" "logs_to_es" {
  description = "allow all egress network for the lambda logs_to_es"
  vpc_id      = module.vpc.vpc_id
  name        = "lambda-${local.env_type}-logs-to-es"

  egress {
    protocol    = "-1"
    from_port   = 0
    to_port     = 0
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name               = "lambda-${local.env_type}-logs-to-es"
    ManagedByTerraform = "true"
    EnvironmentType    = "${local.env_type}"
  }
}


data "archive_file" "logs_to_es_zip" {
  type        = "zip"
  source_file = "lambdas/logs_to_elasticsearch/logs_to_elasticsearch.js"
  output_path = "lambdas/logs_to_elasticsearch/logs_to_elasticsearch.zip"
}

resource "aws_lambda_function" "logs_to_es" {

  filename         = data.archive_file.logs_to_es_zip.output_path
  source_code_hash = data.archive_file.logs_to_es_zip.output_base64sha256
  function_name    = "logs_to_es_${local.env_type}"

  role    = aws_iam_role.logs_to_es_lambda_role.arn
  handler = "logs_to_elasticsearch.handler"

  timeout = 100

  runtime = "nodejs10.x"

  description = "${local.env_type} script that forwards cloudwatch logs of that VPC's lambda to ES"

  vpc_config {
    subnet_ids = module.vpc.private_subnets
    security_group_ids = [
      aws_security_group.logs_to_es.id
    ]
  }

  environment {
    variables = {
      ELASTICSEARCH_ENDPOINT = aws_elasticsearch_domain.technical_logs.endpoint
    }
  }

  tags = {
    Environment        = local.env_type
    ManagedByTerraform = "true"
  }

  depends_on = [
    "aws_iam_role_policy_attachment.attach_basic_role_to_logs_to_es",
    "aws_iam_role_policy_attachment.access_vpc_role_to_logs_to_es",
    "aws_cloudwatch_log_group.logs_to_es_log_group",
  ]
}

然后要使用它,您只需将该 lambda ARN 放入 aws_cloudwatch_log_subscription_filter 资源中。

只是添加到@allan.simon 的回答中。也许是错误的,他没有在他的回答中 post lambda 函数的代码,所以我 post 在这里,为了可能需要它的人,不要手动生成它。

// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');

var endpoint = 'YOUR ENDPOINT HERE';

// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;

exports.handler = function(input, context) {
    // decode input from base64
    var zippedInput = new Buffer.from(input.awslogs.data, 'base64');

    // decompress the input
    zlib.gunzip(zippedInput, function(error, buffer) {
        if (error) { context.fail(error); return; }

        // parse the input from JSON
        var awslogsData = JSON.parse(buffer.toString('utf8'));

        // transform the input to Elasticsearch documents
        var elasticsearchBulkData = transform(awslogsData);

        // skip control messages
        if (!elasticsearchBulkData) {
            console.log('Received a control message');
            context.succeed('Control message handled successfully');
            return;
        }

        // post documents to the Amazon Elasticsearch Service
        post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
            console.log('Response: ' + JSON.stringify({
                "statusCode": statusCode
            }));

            if (error) {
                logFailure(error, failedItems);
                context.fail(JSON.stringify(error));
            } else {
                console.log('Success: ' + JSON.stringify(success));
                context.succeed('Success');
            }
        });
    });
};

function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null;
    }

    var bulkRequestBody = '';

    payload.logEvents.forEach(function(logEvent) {
        var timestamp = new Date(1 * logEvent.timestamp);

        // index name format: cwl-YYYY.MM.DD
        var indexName = [
            'cwl-' + timestamp.getUTCFullYear(),              // year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');

        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;

        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = payload.logGroup;
        action.index._id = logEvent.id;

        bulkRequestBody += [
            JSON.stringify(action),
            JSON.stringify(source),
        ].join('\n') + '\n';
    });
    return bulkRequestBody;
}

function buildSource(message, extractedFields) {
    if (extractedFields) {
        var source = {};

        for (var key in extractedFields) {
            if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
                var value = extractedFields[key];

                if (isNumeric(value)) {
                    source[key] = 1 * value;
                    continue;
                }

                jsonSubString = extractJson(value);
                if (jsonSubString !== null) {
                    source['$' + key] = JSON.parse(jsonSubString);
                }

                source[key] = value;
            }
        }
        return source;
    }

    jsonSubString = extractJson(message);
    if (jsonSubString !== null) {
        return JSON.parse(jsonSubString);
    }

    return {};
}

function extractJson(message) {
    var jsonStart = message.indexOf('{');
    if (jsonStart < 0) return null;
    var jsonSubString = message.substring(jsonStart);
    return isValidJson(jsonSubString) ? jsonSubString : null;
}

function isValidJson(message) {
    try {
        JSON.parse(message);
    } catch (e) { return false; }
    return true;
}

function isNumeric(n) {
    return !isNaN(parseFloat(n)) && isFinite(n);
}

function post(body, callback) {
    var requestParams = buildRequest(endpoint, body);

    var request = https.request(requestParams, function(response) {
        var responseBody = '';
        response.on('data', function(chunk) {
            responseBody += chunk;
        });

        response.on('end', function() {
            var info = JSON.parse(responseBody);
            var failedItems;
            var success;
            var error;

            if (response.statusCode >= 200 && response.statusCode < 299) {
                failedItems = info.items.filter(function(x) {
                    return x.index.status >= 300;
                });

                success = {
                    "attemptedItems": info.items.length,
                    "successfulItems": info.items.length - failedItems.length,
                    "failedItems": failedItems.length
                };
            }

            if (response.statusCode !== 200 || info.errors === true) {
                // prevents logging of failed entries, but allows logging
                // of other errors such as access restrictions
                delete info.items;
                error = {
                    statusCode: response.statusCode,
                    responseBody: info
                };
            }

            callback(error, success, response.statusCode, failedItems);
        });
    }).on('error', function(e) {
        callback(e);
    });
    request.end(requestParams.body);
}

function buildRequest(endpoint, body) {
    var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
    var region = endpointParts[2];
    var service = endpointParts[3];
    var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
    var date = datetime.substr(0, 8);
    var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
    var kRegion = hmac(kDate, region);
    var kService = hmac(kRegion, service);
    var kSigning = hmac(kService, 'aws4_request');

    var request = {
        host: endpoint,
        method: 'POST',
        path: '/_bulk',
        body: body,
        headers: {
            'Content-Type': 'application/json',
            'Host': endpoint,
            'Content-Length': Buffer.byteLength(body),
            'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
            'X-Amz-Date': datetime
        }
    };

    var canonicalHeaders = Object.keys(request.headers)
        .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
        .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
        .join('\n');

    var signedHeaders = Object.keys(request.headers)
        .map(function(k) { return k.toLowerCase(); })
        .sort()
        .join(';');

    var canonicalString = [
        request.method,
        request.path, '',
        canonicalHeaders, '',
        signedHeaders,
        hash(request.body, 'hex'),
    ].join('\n');

    var credentialString = [ date, region, service, 'aws4_request' ].join('/');

    var stringToSign = [
        'AWS4-HMAC-SHA256',
        datetime,
        credentialString,
        hash(canonicalString, 'hex')
    ] .join('\n');

    request.headers.Authorization = [
        'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
        'SignedHeaders=' + signedHeaders,
        'Signature=' + hmac(kSigning, stringToSign, 'hex')
    ].join(', ');

    return request;
}

function hmac(key, str, encoding) {
    return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}

function hash(str, encoding) {
    return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}

function logFailure(error, failedItems) {
    if (logFailedResponses) {
        console.log('Error: ' + JSON.stringify(error, null, 2));

        if (failedItems && failedItems.length > 0) {
            console.log("Failed Items: " +
                JSON.stringify(failedItems, null, 2));
        }
    }
}