当日志在不同进程中发出时 caplog 中的空消息

Empty messages in caplog when logs emmited in a different process

我是运行测试log_cli=true。 剧本:

import logging
import sys
from multiprocessing import Process

logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)

logger = logging.getLogger("leapp.actors.quagga_report")


class ActorContext:
    def __init__(self):
        self.log = logger

    def run(self):
        self.log.debug("Some msg")


current_actor_context = ActorContext()


def test_caplog_fails(caplog):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        p = Process(target=current_actor_context.run)
        p.start()
        p.join()
    assert "Some msg" in caplog.text


def test_caplog_passes(caplog):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        current_actor_context.run()
    assert "Some msg" in caplog.text

pytest log_cli 显示两个测试中的日志消息, 但是,caplog 只能看到第二次测试的消息。

第一次测试失败,回溯如下:

-------------------------------- live log call ---------------------------------
| 13:39:20 | 40212 | leapp.actors.quagga_report | DEBUG | test_logger_caplog_fails.py | Some msg
FAILED
tests/test_logger_caplog_fails.py:20 (test_caplog_fails)
Traceback (most recent call last):
  File "/home/azhukov/Dropbox/code/lighting_talks/asyncio_subprocess_shells/tests/test_logger_caplog_fails.py", line 26, in test_caplog_fails
    assert "Some msg" in caplog.text
AssertionError: assert 'Some msg' in ''
 +  where '' = <_pytest.logging.LogCaptureFixture object at 0x7fb8a87f2370>.text

我正在寻找一个类似的问题 ,但是在我的情况下 属性 propagate=True

受到@hoefling 的启发并且仍然愿意使用 caplog,有一个解决方案。这个想法是创建一个夹具,它从 QueueHandler 处理程序中获取队列并在主进程中重新发出日志,这可以被 caplog

捕获
import logging
import sys
from contextlib import contextmanager
from logging import handlers
from multiprocessing import Process, Queue

import pytest

logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)


logger = logging.getLogger(__name__)


class ActorContext:
    def __init__(self):
        self.log = logger

    def run(self):
        self.log.debug("Some msg")


current_actor_context = ActorContext()


@pytest.fixture()
def caplog_workaround():
    @contextmanager
    def ctx():
        logger_queue = Queue()
        logger = logging.getLogger()
        logger.addHandler(handlers.QueueHandler(logger_queue))
        yield
        while not logger_queue.empty():
            log_record: logging.LogRecord = logger_queue.get()
            logger._log(
                level=log_record.levelno,
                msg=log_record.message,
                args=log_record.args,
                exc_info=log_record.exc_info,
            )

    return ctx


def test_caplog_already_not_fails(caplog, caplog_workaround):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        with caplog_workaround():
            p = Process(target=current_actor_context.run)
            p.start()
            p.join()
    assert "Some msg" in caplog.text


def test_caplog_passes(caplog, capsys):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        current_actor_context.run()
    assert "Some msg" in caplog.text