Python/Boto - 在没有序列令牌的情况下写入 AWS CloudWatch Logs

Python/Boto - Writing to AWS CloudWatch Logs without sequence token

我正在尝试使用 Python 和 Boto 框架将日志发送到 AWS CloudWatch Logs。我这样做:

res=logs.put_log_events("FOO", "BAR",
     [{'timestamp':int(round(time.time() * 1000)),
       'message':time.strftime("%m/%d/%Y %H:%M:%S")+' Scheduled  monitoring check' }], 
     sequence_token=None)

我每次 运行:

都会出错
boto.logs.exceptions.InvalidSequenceTokenException: InvalidSequenceTokenException: 400 Bad Request
{u'message': u'The given sequenceToken is invalid. The next expected sequenceToken is: 49540113336360065754596906019042392283494234157161146226', u'expectedSequenceToken': u'49540113336360065754596906019042392283494234157161146226', u'__type': u'InvalidSequenceTokenException'}

存储该令牌对我来说有点不切实际。这毫无意义,为什么我不能直接附加到日志流?

我该如何解决这个问题?

你不能,这就是它的工作原理:

Every PutLogEvents request must include the sequenceToken obtained from the response of the previous request. An upload in a newly created log stream does not require a sequenceToken.

(source)

您可以先通过 describe_log_streams():

查找 uploadSequenceToken 来绕过它

本质上,该过程是您使用 logStreamNamePrefix 专门标识要附加到的日志流。然后从响应中解析 uploadSequenceToken。

Response Syntax

 {
     'logStreams': [
         {
             'logStreamName': 'string',
             'creationTime': 123,
             'firstEventTimestamp': 123,
             'lastEventTimestamp': 123,
             'lastIngestionTime': 123,
             'uploadSequenceToken': 'string',
             'arn': 'string',
             'storedBytes': 123
         },
     ],
     'nextToken': 'string'
 }

Returns all the log streams that are associated with the specified log group. The list returned in the response is ASCII-sorted by log stream name.

By default, this operation returns up to 50 log streams. If there are more log streams to list, the response would contain a nextToken value in the response body. You can also limit the number of log streams returned in the response by specifying the limit parameter in the request. This operation has a limit of five transactions per second, after which transactions are throttled.

Request Syntax

response = client.describe_log_streams(
    logGroupName='string',
    logStreamNamePrefix='string',
    orderBy='LogStreamName'|'LastEventTime',
    descending=True|False,
    nextToken='string',
    limit=123
)

用有根据的猜测回答为什么部分:这是可扩展异步服务的本质。

如果 Amazon 不会 要求您维护序列号,那么他们永远无法跨多个实例扩展他们的 CloudWatch 服务,同时仍然能够保证您的日志出现在与发生的顺序完全相同(想象一下在调试问题时乱序的日志条目会有多烦人)。时钟、网络延迟或日志接受器路径上的其他延迟的任何微小偏差都会引入排序问题。

但是由于他们确实要求您提供序列号,突然间他们可以轻松地扩展他们的服务并且简单地 merge-sort 返回传入的日志条目,同时仍然保留正确的登录顺序,您的 登录顺序。

AWS Cloudwatch Putlogevent 代码

import boto3
import time


client = boto3.client('logs')

LOG_GROUP='cloudwatch_customlog'
LOG_STREAM='{}-{}'.format(time.strftime('%Y-%m-%d'),'logstream')

try:
   client.create_log_group(logGroupName=LOG_GROUP)
except client.exceptions.ResourceAlreadyExistsException:
   pass

try:
   client.create_log_stream(logGroupName=LOG_GROUP, logStreamName=LOG_STREAM)
except client.exceptions.ResourceAlreadyExistsException:
   pass

response = client.describe_log_streams(
   logGroupName=LOG_GROUP,
   logStreamNamePrefix=LOG_STREAM
)

event_log = {
   'logGroupName': LOG_GROUP,
   'logStreamName': LOG_STREAM,
   'logEvents': [
       {
           'timestamp': int(round(time.time() * 1000)),
           'message': time.strftime('%Y-%m-%d %H:%M:%S')+'\t Your custom log messages'
       }
   ],
}

if 'uploadSequenceToken' in response['logStreams'][0]:
   event_log.update({'sequenceToken': response['logStreams'][0] ['uploadSequenceToken']})

response = client.put_log_events(**event_log)
print(response)

the docs中所述:

You can also get the sequence token in the expectedSequenceToken field from InvalidSequenceTokenException.

但是,问题是 boto3 在异常中没有 expectedSequenceToken 字段,正如在 the issue:

中讨论的那样

Boto3 doesn't support parsing additional parameters from exceptions, it only adds a Code and a Message. Labeling as a feature request, I think this is something we should add, but for now your best workaround would be to parse the error message.

显然,通过解析消息来获取令牌并不理想,因为消息的格式可能会发生变化。但它提供了一个简单的工作解决方案,无需调用 describe_log_streams.

def append_log(group: str, stream: str, msg: str):
    logs = boto3.client('logs')

    def put(token=None, repeat: int = 0):
        events = [{
            'timestamp': int(round(time.time() * 1000)),
            'message': msg
        }]
        try:
            if token:
                logs.put_log_events(logGroupName=group, logStreamName=stream, logEvents=events, sequenceToken=token)
            else:
                logs.put_log_events(logGroupName=group, logStreamName=stream, logEvents=events)
        except (logs.exceptions.InvalidSequenceTokenException, logs.exceptions.DataAlreadyAcceptedException) as e:
            error_msg = e.response['Error']['Message']
            if repeat > 10:
                raise Exception("Too many repeats to write log")
            put(error_msg[error_msg.index(":") + 1:].strip(), repeat + 1)

    try:
        put()
    except logs.exceptions.ResourceNotFoundException:
        try:
            logs.create_log_stream(logGroupName=group, logStreamName=stream)
        except logs.exceptions.ResourceNotFoundException:
            logs.create_log_group(logGroupName=group)
            logs.create_log_stream(logGroupName=group, logStreamName=stream)
        put()

如果不存在,该函数将创建组和流。

这是我在此处其他答案的帮助下创建的日志 class,不需要 logs:DescribeLogStreams IAM 权限。这是一个可以导入的独立模块(在 class 初始化时传入了 boto3 会话)。

import time

class CloudWatch:
    def __init__(self, boto3, log_group):
        self.client = boto3.client("logs")
        self.log_group = log_group
        self.sequence_token = None

    def log(self, message):
        
        print(message) # Delete this if you don't want stdout as well.

        log_stream = time.strftime('%Y-%m-%d')

        event_log = {
            'logGroupName': self.log_group,
            'logStreamName': log_stream,
            'logEvents': [
                {
                    'timestamp': int(round(time.time() * 1000)),
                    'message': message
                }
            ],
        }

        if self.sequence_token is not None:
            event_log.update({"sequenceToken" : self.sequence_token})

        for _ in range(3):
            try:
                response = self.client.put_log_events(**event_log)
                self.sequence_token = response["nextSequenceToken"]
                return
            except self.client.exceptions.ResourceNotFoundException:
                try:
                    self.client.create_log_group(logGroupName=self.log_group)
                except self.client.exceptions.ResourceAlreadyExistsException:
                    pass
                try:
                    self.client.create_log_stream(logGroupName=self.log_group, logStreamName=log_stream)
                except self.client.exceptions.ResourceAlreadyExistsException:
                    pass
            except self.client.exceptions.InvalidSequenceTokenException as e:
                event_log.update({"sequenceToken" : e.response["Error"]["Message"].split("is: ")[-1]})
                continue
            except self.client.exceptions.DataAlreadyAcceptedException:
                return

虽然这里的大多数答案都有效,但是..如果你有多个进程非常快地写入同一个流,他们会一直得到异常,如果你把它放在一个循环中,同样的比赛条件适用。每个人都应该意识到这一点!