在 Python 日志记录模块 AKA 日志压缩中抑制具有相同内容的多条消息
Suppress multiple messages with same content in Python logging module AKA log compression
根据设计,我的应用程序有时会产生重复的错误,这些错误会填满日志文件并让人难以阅读。看起来像这样:
WARNING:__main__:CRON10: clock unset or no wind update received in 60 sec -> supressed rrd update
WARNING:__main__:CRON10: clock unset or no wind update received in 60 sec -> supressed rrd update
WARNING:__main__:CRON10: clock unset or no wind update received in 60 sec -> supressed rrd update
WARNING:__main__:CRON10: clock unset or no wind update received in 60 sec -> supressed rrd update
如何使用 Python 日志记录模块来抑制重复消息并输出更多 rsyslog 样式的内容 (http://www.rsyslog.com/doc/rsconf1_repeatedmsgreduction.html):
WARNING:__main__:CRON10: clock unset or no wind update received in 60 sec -> supressed rrd update
--- The last message repeated 3 times
有没有办法扩展日志记录,或者我是否必须编写一个完全自己的记录器?
我用于记录的代码是:
logging.basicConfig(format='%(asctime)s %(message)s')
logging.basicConfig(level=logging.info)
logger = logging.getLogger(__name__)
hdlr = logging.FileHandler(LOGFILE)
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
有什么想法吗?
您可以创建一个 logging.Filter
来跟踪最后记录的记录并过滤掉任何重复(相似)的记录,例如:
import logging
class DuplicateFilter(logging.Filter):
def filter(self, record):
# add other fields if you need more granular comparison, depends on your app
current_log = (record.module, record.levelno, record.msg)
if current_log != getattr(self, "last_log", None):
self.last_log = current_log
return True
return False
然后只需将它添加到您使用的 logger/handler(即 hdlr.addFilter(DuplicateFilter())
)或根记录器以过滤所有默认日志。这是一个简单的测试:
import logging
logging.warn("my test")
logging.warn("my repeated test")
logging.warn("my repeated test")
logging.warn("my repeated test")
logging.warn("my other test")
logger = logging.getLogger() # get the root logger
logger.addFilter(DuplicateFilter()) # add the filter to it
logging.warn("my test")
logging.warn("my repeated test")
logging.warn("my repeated test")
logging.warn("my repeated test")
logging.warn("my other test")
这将打印出:
WARNING:root:my test
WARNING:root:my repeated test
WARNING:root:my repeated test
WARNING:root:my repeated test
WARNING:root:my other test
WARNING:root:my test
WARNING:root:my repeated test
WARNING:root:my other test
另一种选择是覆盖 logging.Logger class 以修改 _log 函数:
import logging
import zlib
class FilteredLogger(logging.Logger):
def __init__(self, name, level=logging.NOTSET):
super().__init__(name, level)
self._message_lockup = {}
def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False, log_intervall=None):
if log_intervall is None or log_intervall == 1:
super(FilteredLogger, self)._log(level, msg, args, exc_info, extra, stack_info)
else:
message_Id = zlib.crc32(msg.encode('utf-8'))
if message_Id not in self._message_lockup:
self._message_lockup[message_Id] = 0
super(FilteredLogger, self)._log(level, msg, args, exc_info, extra, stack_info)
elif self._message_lockup[message_Id] % log_intervall == 0:
msg += f' -- Suppressed {log_intervall} equal messages'
super(FilteredLogger, self)._log(level, msg, args, exc_info, extra, stack_info)
self._message_lockup[message_Id] += 1
if __name__ == '__main__':
logging.setLoggerClass(FilteredLogger)
logger = logging.getLogger('test_logger')
consol_handler = logging.StreamHandler()
logger.addHandler(consol_handler)
logger.setLevel(logging.DEBUG)
for count in range(1000):
logger.info('Test log', log_intervall=100)
输出如下所示:
Test log
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
可以轻松自定义此实现。
根据设计,我的应用程序有时会产生重复的错误,这些错误会填满日志文件并让人难以阅读。看起来像这样:
WARNING:__main__:CRON10: clock unset or no wind update received in 60 sec -> supressed rrd update
WARNING:__main__:CRON10: clock unset or no wind update received in 60 sec -> supressed rrd update
WARNING:__main__:CRON10: clock unset or no wind update received in 60 sec -> supressed rrd update
WARNING:__main__:CRON10: clock unset or no wind update received in 60 sec -> supressed rrd update
如何使用 Python 日志记录模块来抑制重复消息并输出更多 rsyslog 样式的内容 (http://www.rsyslog.com/doc/rsconf1_repeatedmsgreduction.html):
WARNING:__main__:CRON10: clock unset or no wind update received in 60 sec -> supressed rrd update
--- The last message repeated 3 times
有没有办法扩展日志记录,或者我是否必须编写一个完全自己的记录器?
我用于记录的代码是:
logging.basicConfig(format='%(asctime)s %(message)s')
logging.basicConfig(level=logging.info)
logger = logging.getLogger(__name__)
hdlr = logging.FileHandler(LOGFILE)
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
有什么想法吗?
您可以创建一个 logging.Filter
来跟踪最后记录的记录并过滤掉任何重复(相似)的记录,例如:
import logging
class DuplicateFilter(logging.Filter):
def filter(self, record):
# add other fields if you need more granular comparison, depends on your app
current_log = (record.module, record.levelno, record.msg)
if current_log != getattr(self, "last_log", None):
self.last_log = current_log
return True
return False
然后只需将它添加到您使用的 logger/handler(即 hdlr.addFilter(DuplicateFilter())
)或根记录器以过滤所有默认日志。这是一个简单的测试:
import logging
logging.warn("my test")
logging.warn("my repeated test")
logging.warn("my repeated test")
logging.warn("my repeated test")
logging.warn("my other test")
logger = logging.getLogger() # get the root logger
logger.addFilter(DuplicateFilter()) # add the filter to it
logging.warn("my test")
logging.warn("my repeated test")
logging.warn("my repeated test")
logging.warn("my repeated test")
logging.warn("my other test")
这将打印出:
WARNING:root:my test WARNING:root:my repeated test WARNING:root:my repeated test WARNING:root:my repeated test WARNING:root:my other test WARNING:root:my test WARNING:root:my repeated test WARNING:root:my other test
另一种选择是覆盖 logging.Logger class 以修改 _log 函数:
import logging
import zlib
class FilteredLogger(logging.Logger):
def __init__(self, name, level=logging.NOTSET):
super().__init__(name, level)
self._message_lockup = {}
def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False, log_intervall=None):
if log_intervall is None or log_intervall == 1:
super(FilteredLogger, self)._log(level, msg, args, exc_info, extra, stack_info)
else:
message_Id = zlib.crc32(msg.encode('utf-8'))
if message_Id not in self._message_lockup:
self._message_lockup[message_Id] = 0
super(FilteredLogger, self)._log(level, msg, args, exc_info, extra, stack_info)
elif self._message_lockup[message_Id] % log_intervall == 0:
msg += f' -- Suppressed {log_intervall} equal messages'
super(FilteredLogger, self)._log(level, msg, args, exc_info, extra, stack_info)
self._message_lockup[message_Id] += 1
if __name__ == '__main__':
logging.setLoggerClass(FilteredLogger)
logger = logging.getLogger('test_logger')
consol_handler = logging.StreamHandler()
logger.addHandler(consol_handler)
logger.setLevel(logging.DEBUG)
for count in range(1000):
logger.info('Test log', log_intervall=100)
输出如下所示:
Test log
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
Test log -- Suppressed 100 equal messages
可以轻松自定义此实现。