This commit is contained in:
Iliyan Angelov
2025-12-06 03:27:35 +02:00
parent 7667eb5eda
commit 5a8ca3c475
2211 changed files with 28086 additions and 37066 deletions

View File

@@ -1,33 +1,31 @@
# mypy: allow-untyped-defs
"""Access and control log capturing."""
from __future__ import annotations
from collections.abc import Generator
from collections.abc import Mapping
from collections.abc import Set as AbstractSet
import io
import logging
import os
import re
from contextlib import contextmanager
from contextlib import nullcontext
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import io
from io import StringIO
import logging
from logging import LogRecord
import os
from pathlib import Path
import re
from types import TracebackType
from typing import final
from typing import Generic
from typing import Literal
from typing import AbstractSet
from typing import Dict
from typing import Generator
from typing import List
from typing import Mapping
from typing import Optional
from typing import Tuple
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
from _pytest import nodes
from _pytest._io import TerminalWriter
from _pytest.capture import CaptureManager
from _pytest.compat import final
from _pytest.config import _strtobool
from _pytest.config import Config
from _pytest.config import create_terminal_writer
@@ -41,9 +39,10 @@ from _pytest.main import Session
from _pytest.stash import StashKey
from _pytest.terminal import TerminalReporter
if TYPE_CHECKING:
logging_StreamHandler = logging.StreamHandler[StringIO]
from typing_extensions import Literal
else:
logging_StreamHandler = logging.StreamHandler
@@ -51,7 +50,7 @@ DEFAULT_LOG_FORMAT = "%(levelname)-8s %(name)s:%(filename)s:%(lineno)d %(message
DEFAULT_LOG_DATE_FORMAT = "%H:%M:%S"
_ANSI_ESCAPE_SEQ = re.compile(r"\x1b\[[\d;]+m")
caplog_handler_key = StashKey["LogCaptureHandler"]()
caplog_records_key = StashKey[dict[str, list[logging.LogRecord]]]()
caplog_records_key = StashKey[Dict[str, List[logging.LogRecord]]]()
def _remove_ansi_escape_sequences(text: str) -> str:
@@ -64,14 +63,13 @@ class DatetimeFormatter(logging.Formatter):
:func:`time.strftime` in case of microseconds in format string.
"""
def formatTime(self, record: LogRecord, datefmt: str | None = None) -> str:
def formatTime(self, record: LogRecord, datefmt=None) -> str:
if datefmt and "%f" in datefmt:
ct = self.converter(record.created)
tz = timezone(timedelta(seconds=ct.tm_gmtoff), ct.tm_zone)
# Construct `datetime.datetime` object from `struct_time`
# and msecs information from `record`
# Using int() instead of round() to avoid it exceeding 1_000_000 and causing a ValueError (#11861).
dt = datetime(*ct[0:6], microsecond=int(record.msecs * 1000), tzinfo=tz)
dt = datetime(*ct[0:6], microsecond=round(record.msecs * 1000), tzinfo=tz)
return dt.strftime(datefmt)
# Use `logging.Formatter` for non-microsecond formats
return super().formatTime(record, datefmt)
@@ -96,7 +94,7 @@ class ColoredLevelFormatter(DatetimeFormatter):
super().__init__(*args, **kwargs)
self._terminalwriter = terminalwriter
self._original_fmt = self._style._fmt
self._level_to_fmt_mapping: dict[int, str] = {}
self._level_to_fmt_mapping: Dict[int, str] = {}
for level, color_opts in self.LOGLEVEL_COLOROPTS.items():
self.add_color_level(level, *color_opts)
@@ -114,6 +112,7 @@ class ColoredLevelFormatter(DatetimeFormatter):
.. warning::
This is an experimental API.
"""
assert self._fmt is not None
levelname_fmt_match = self.LEVELNAME_FMT_REGEX.search(self._fmt)
if not levelname_fmt_match:
@@ -144,12 +143,12 @@ class PercentStyleMultiline(logging.PercentStyle):
formats the message as if each line were logged separately.
"""
def __init__(self, fmt: str, auto_indent: int | str | bool | None) -> None:
def __init__(self, fmt: str, auto_indent: Union[int, str, bool, None]) -> None:
super().__init__(fmt)
self._auto_indent = self._get_auto_indent(auto_indent)
@staticmethod
def _get_auto_indent(auto_indent_option: int | str | bool | None) -> int:
def _get_auto_indent(auto_indent_option: Union[int, str, bool, None]) -> int:
"""Determine the current auto indentation setting.
Specify auto indent behavior (on/off/fixed) by passing in
@@ -180,6 +179,7 @@ class PercentStyleMultiline(logging.PercentStyle):
0 (auto-indent turned off) or
>0 (explicitly set indentation position).
"""
if auto_indent_option is None:
return 0
elif isinstance(auto_indent_option, bool):
@@ -206,7 +206,7 @@ class PercentStyleMultiline(logging.PercentStyle):
if "\n" in record.message:
if hasattr(record, "auto_indent"):
# Passed in from the "extra={}" kwarg on the call to logging.log().
auto_indent = self._get_auto_indent(record.auto_indent)
auto_indent = self._get_auto_indent(record.auto_indent) # type: ignore[attr-defined]
else:
auto_indent = self._auto_indent
@@ -295,13 +295,6 @@ def pytest_addoption(parser: Parser) -> None:
default=None,
help="Path to a file when logging will be written to",
)
add_option_ini(
"--log-file-mode",
dest="log_file_mode",
default="w",
choices=["w", "a"],
help="Log file open mode",
)
add_option_ini(
"--log-file-level",
dest="log_file_level",
@@ -311,13 +304,13 @@ def pytest_addoption(parser: Parser) -> None:
add_option_ini(
"--log-file-format",
dest="log_file_format",
default=None,
default=DEFAULT_LOG_FORMAT,
help="Log format used by the logging module",
)
add_option_ini(
"--log-file-date-format",
dest="log_file_date_format",
default=None,
default=DEFAULT_LOG_DATE_FORMAT,
help="Log date format used by the logging module",
)
add_option_ini(
@@ -339,16 +332,16 @@ _HandlerType = TypeVar("_HandlerType", bound=logging.Handler)
# Not using @contextmanager for performance reasons.
class catching_logs(Generic[_HandlerType]):
class catching_logs:
"""Context manager that prepares the whole logging machinery properly."""
__slots__ = ("handler", "level", "orig_level")
def __init__(self, handler: _HandlerType, level: int | None = None) -> None:
def __init__(self, handler: _HandlerType, level: Optional[int] = None) -> None:
self.handler = handler
self.level = level
def __enter__(self) -> _HandlerType:
def __enter__(self):
root_logger = logging.getLogger()
if self.level is not None:
self.handler.setLevel(self.level)
@@ -358,12 +351,7 @@ class catching_logs(Generic[_HandlerType]):
root_logger.setLevel(min(self.orig_level, self.level))
return self.handler
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
def __exit__(self, type, value, traceback):
root_logger = logging.getLogger()
if self.level is not None:
root_logger.setLevel(self.orig_level)
@@ -376,7 +364,7 @@ class LogCaptureHandler(logging_StreamHandler):
def __init__(self) -> None:
"""Create a new log handler."""
super().__init__(StringIO())
self.records: list[logging.LogRecord] = []
self.records: List[logging.LogRecord] = []
def emit(self, record: logging.LogRecord) -> None:
"""Keep the log records in a list in addition to the log text."""
@@ -397,7 +385,7 @@ class LogCaptureHandler(logging_StreamHandler):
# The default behavior of logging is to print "Logging error"
# to stderr with the call stack and some extra details.
# pytest wants to make such mistakes visible during testing.
raise # noqa: PLE0704
raise
@final
@@ -407,10 +395,10 @@ class LogCaptureFixture:
def __init__(self, item: nodes.Node, *, _ispytest: bool = False) -> None:
check_ispytest(_ispytest)
self._item = item
self._initial_handler_level: int | None = None
self._initial_handler_level: Optional[int] = None
# Dict of log name -> log level.
self._initial_logger_levels: dict[str | None, int] = {}
self._initial_disabled_logging_level: int | None = None
self._initial_logger_levels: Dict[Optional[str], int] = {}
self._initial_disabled_logging_level: Optional[int] = None
def _finalize(self) -> None:
"""Finalize the fixture.
@@ -434,8 +422,8 @@ class LogCaptureFixture:
return self._item.stash[caplog_handler_key]
def get_records(
self, when: Literal["setup", "call", "teardown"]
) -> list[logging.LogRecord]:
self, when: "Literal['setup', 'call', 'teardown']"
) -> List[logging.LogRecord]:
"""Get the logging records for one of the possible test phases.
:param when:
@@ -454,12 +442,12 @@ class LogCaptureFixture:
return _remove_ansi_escape_sequences(self.handler.stream.getvalue())
@property
def records(self) -> list[logging.LogRecord]:
def records(self) -> List[logging.LogRecord]:
"""The list of log records."""
return self.handler.records
@property
def record_tuples(self) -> list[tuple[str, int, str]]:
def record_tuples(self) -> List[Tuple[str, int, str]]:
"""A list of a stripped down version of log records intended
for use in assertion comparison.
@@ -470,7 +458,7 @@ class LogCaptureFixture:
return [(r.name, r.levelno, r.getMessage()) for r in self.records]
@property
def messages(self) -> list[str]:
def messages(self) -> List[str]:
"""A list of format-interpolated log messages.
Unlike 'records', which contains the format string and parameters for
@@ -493,7 +481,7 @@ class LogCaptureFixture:
self.handler.clear()
def _force_enable_logging(
self, level: int | str, logger_obj: logging.Logger
self, level: Union[int, str], logger_obj: logging.Logger
) -> int:
"""Enable the desired logging level if the global level was disabled via ``logging.disabled``.
@@ -509,7 +497,7 @@ class LogCaptureFixture:
:return: The original disabled logging level.
"""
original_disable_level: int = logger_obj.manager.disable
original_disable_level: int = logger_obj.manager.disable # type: ignore[attr-defined]
if isinstance(level, str):
# Try to translate the level string to an int for `logging.disable()`
@@ -526,7 +514,7 @@ class LogCaptureFixture:
return original_disable_level
def set_level(self, level: int | str, logger: str | None = None) -> None:
def set_level(self, level: Union[int, str], logger: Optional[str] = None) -> None:
"""Set the threshold level of a logger for the duration of a test.
Logging messages which are less severe than this level will not be captured.
@@ -535,7 +523,7 @@ class LogCaptureFixture:
The levels of the loggers changed by this function will be
restored to their initial values at the end of the test.
Will enable the requested logging level if it was disabled via :func:`logging.disable`.
Will enable the requested logging level if it was disabled via :meth:`logging.disable`.
:param level: The level.
:param logger: The logger to update. If not given, the root logger.
@@ -552,12 +540,14 @@ class LogCaptureFixture:
self._initial_disabled_logging_level = initial_disabled_logging_level
@contextmanager
def at_level(self, level: int | str, logger: str | None = None) -> Generator[None]:
def at_level(
self, level: Union[int, str], logger: Optional[str] = None
) -> Generator[None, None, None]:
"""Context manager that sets the level for capturing of logs. After
the end of the 'with' statement the level is restored to its original
value.
Will enable the requested logging level if it was disabled via :func:`logging.disable`.
Will enable the requested logging level if it was disabled via :meth:`logging.disable`.
:param level: The level.
:param logger: The logger to update. If not given, the root logger.
@@ -575,25 +565,9 @@ class LogCaptureFixture:
self.handler.setLevel(handler_orig_level)
logging.disable(original_disable_level)
@contextmanager
def filtering(self, filter_: logging.Filter) -> Generator[None]:
"""Context manager that temporarily adds the given filter to the caplog's
:meth:`handler` for the 'with' statement block, and removes that filter at the
end of the block.
:param filter_: A custom :class:`logging.Filter` object.
.. versionadded:: 7.5
"""
self.handler.addFilter(filter_)
try:
yield
finally:
self.handler.removeFilter(filter_)
@fixture
def caplog(request: FixtureRequest) -> Generator[LogCaptureFixture]:
def caplog(request: FixtureRequest) -> Generator[LogCaptureFixture, None, None]:
"""Access and control log capturing.
Captured logs are available through the following properties/methods::
@@ -609,7 +583,7 @@ def caplog(request: FixtureRequest) -> Generator[LogCaptureFixture]:
result._finalize()
def get_log_level_for_setting(config: Config, *setting_names: str) -> int | None:
def get_log_level_for_setting(config: Config, *setting_names: str) -> Optional[int]:
for setting_name in setting_names:
log_level = config.getoption(setting_name)
if log_level is None:
@@ -626,9 +600,9 @@ def get_log_level_for_setting(config: Config, *setting_names: str) -> int | None
except ValueError as e:
# Python logging does not recognise this as a logging level
raise UsageError(
f"'{log_level}' is not recognized as a logging level name for "
f"'{setting_name}'. Please consider passing the "
"logging level num instead."
"'{}' is not recognized as a logging level name for "
"'{}'. Please consider passing the "
"logging level num instead.".format(log_level, setting_name)
) from e
@@ -662,19 +636,14 @@ class LoggingPlugin:
self.report_handler.setFormatter(self.formatter)
# File logging.
self.log_file_level = get_log_level_for_setting(
config, "log_file_level", "log_level"
)
self.log_file_level = get_log_level_for_setting(config, "log_file_level")
log_file = get_option_ini(config, "log_file") or os.devnull
if log_file != os.devnull:
directory = os.path.dirname(os.path.abspath(log_file))
if not os.path.isdir(directory):
os.makedirs(directory)
self.log_file_mode = get_option_ini(config, "log_file_mode") or "w"
self.log_file_handler = _FileHandler(
log_file, mode=self.log_file_mode, encoding="UTF-8"
)
self.log_file_handler = _FileHandler(log_file, mode="w", encoding="UTF-8")
log_file_format = get_option_ini(config, "log_file_format", "log_format")
log_file_date_format = get_option_ini(
config, "log_file_date_format", "log_date_format"
@@ -695,9 +664,9 @@ class LoggingPlugin:
assert terminal_reporter is not None
capture_manager = config.pluginmanager.get_plugin("capturemanager")
# if capturemanager plugin is disabled, live logging still works.
self.log_cli_handler: (
_LiveLoggingStreamHandler | _LiveLoggingNullHandler
) = _LiveLoggingStreamHandler(terminal_reporter, capture_manager)
self.log_cli_handler: Union[
_LiveLoggingStreamHandler, _LiveLoggingNullHandler
] = _LiveLoggingStreamHandler(terminal_reporter, capture_manager)
else:
self.log_cli_handler = _LiveLoggingNullHandler()
log_cli_formatter = self._create_formatter(
@@ -708,7 +677,7 @@ class LoggingPlugin:
self.log_cli_handler.setFormatter(log_cli_formatter)
self._disable_loggers(loggers_to_disable=config.option.logger_disable)
def _disable_loggers(self, loggers_to_disable: list[str]) -> None:
def _disable_loggers(self, loggers_to_disable: List[str]) -> None:
if not loggers_to_disable:
return
@@ -751,12 +720,12 @@ class LoggingPlugin:
fpath.parent.mkdir(exist_ok=True, parents=True)
# https://github.com/python/mypy/issues/11193
stream: io.TextIOWrapper = fpath.open(mode=self.log_file_mode, encoding="UTF-8") # type: ignore[assignment]
stream: io.TextIOWrapper = fpath.open(mode="w", encoding="UTF-8") # type: ignore[assignment]
old_stream = self.log_file_handler.setStream(stream)
if old_stream:
old_stream.close()
def _log_cli_enabled(self) -> bool:
def _log_cli_enabled(self):
"""Return whether live logging is enabled."""
enabled = self._config.getoption(
"--log-cli-level"
@@ -771,34 +740,35 @@ class LoggingPlugin:
return True
@hookimpl(wrapper=True, tryfirst=True)
def pytest_sessionstart(self) -> Generator[None]:
@hookimpl(hookwrapper=True, tryfirst=True)
def pytest_sessionstart(self) -> Generator[None, None, None]:
self.log_cli_handler.set_when("sessionstart")
with catching_logs(self.log_cli_handler, level=self.log_cli_level):
with catching_logs(self.log_file_handler, level=self.log_file_level):
return (yield)
yield
@hookimpl(wrapper=True, tryfirst=True)
def pytest_collection(self) -> Generator[None]:
@hookimpl(hookwrapper=True, tryfirst=True)
def pytest_collection(self) -> Generator[None, None, None]:
self.log_cli_handler.set_when("collection")
with catching_logs(self.log_cli_handler, level=self.log_cli_level):
with catching_logs(self.log_file_handler, level=self.log_file_level):
return (yield)
yield
@hookimpl(wrapper=True)
def pytest_runtestloop(self, session: Session) -> Generator[None, object, object]:
@hookimpl(hookwrapper=True)
def pytest_runtestloop(self, session: Session) -> Generator[None, None, None]:
if session.config.option.collectonly:
return (yield)
yield
return
if self._log_cli_enabled() and self._config.get_verbosity() < 1:
if self._log_cli_enabled() and self._config.getoption("verbose") < 1:
# The verbose flag is needed to avoid messy test progress output.
self._config.option.verbose = 1
with catching_logs(self.log_cli_handler, level=self.log_cli_level):
with catching_logs(self.log_file_handler, level=self.log_file_level):
return (yield) # Run all the tests.
yield # Run all the tests.
@hookimpl
def pytest_runtest_logstart(self) -> None:
@@ -809,68 +779,58 @@ class LoggingPlugin:
def pytest_runtest_logreport(self) -> None:
self.log_cli_handler.set_when("logreport")
@contextmanager
def _runtest_for(self, item: nodes.Item, when: str) -> Generator[None]:
def _runtest_for(self, item: nodes.Item, when: str) -> Generator[None, None, None]:
"""Implement the internals of the pytest_runtest_xxx() hooks."""
with (
catching_logs(
self.caplog_handler,
level=self.log_level,
) as caplog_handler,
catching_logs(
self.report_handler,
level=self.log_level,
) as report_handler,
):
with catching_logs(
self.caplog_handler,
level=self.log_level,
) as caplog_handler, catching_logs(
self.report_handler,
level=self.log_level,
) as report_handler:
caplog_handler.reset()
report_handler.reset()
item.stash[caplog_records_key][when] = caplog_handler.records
item.stash[caplog_handler_key] = caplog_handler
try:
yield
finally:
log = report_handler.stream.getvalue().strip()
item.add_report_section(when, "log", log)
yield
@hookimpl(wrapper=True)
def pytest_runtest_setup(self, item: nodes.Item) -> Generator[None]:
log = report_handler.stream.getvalue().strip()
item.add_report_section(when, "log", log)
@hookimpl(hookwrapper=True)
def pytest_runtest_setup(self, item: nodes.Item) -> Generator[None, None, None]:
self.log_cli_handler.set_when("setup")
empty: dict[str, list[logging.LogRecord]] = {}
empty: Dict[str, List[logging.LogRecord]] = {}
item.stash[caplog_records_key] = empty
with self._runtest_for(item, "setup"):
yield
yield from self._runtest_for(item, "setup")
@hookimpl(wrapper=True)
def pytest_runtest_call(self, item: nodes.Item) -> Generator[None]:
@hookimpl(hookwrapper=True)
def pytest_runtest_call(self, item: nodes.Item) -> Generator[None, None, None]:
self.log_cli_handler.set_when("call")
with self._runtest_for(item, "call"):
yield
yield from self._runtest_for(item, "call")
@hookimpl(wrapper=True)
def pytest_runtest_teardown(self, item: nodes.Item) -> Generator[None]:
@hookimpl(hookwrapper=True)
def pytest_runtest_teardown(self, item: nodes.Item) -> Generator[None, None, None]:
self.log_cli_handler.set_when("teardown")
try:
with self._runtest_for(item, "teardown"):
yield
finally:
del item.stash[caplog_records_key]
del item.stash[caplog_handler_key]
yield from self._runtest_for(item, "teardown")
del item.stash[caplog_records_key]
del item.stash[caplog_handler_key]
@hookimpl
def pytest_runtest_logfinish(self) -> None:
self.log_cli_handler.set_when("finish")
@hookimpl(wrapper=True, tryfirst=True)
def pytest_sessionfinish(self) -> Generator[None]:
@hookimpl(hookwrapper=True, tryfirst=True)
def pytest_sessionfinish(self) -> Generator[None, None, None]:
self.log_cli_handler.set_when("sessionfinish")
with catching_logs(self.log_cli_handler, level=self.log_cli_level):
with catching_logs(self.log_file_handler, level=self.log_file_level):
return (yield)
yield
@hookimpl
def pytest_unconfigure(self) -> None:
@@ -903,7 +863,7 @@ class _LiveLoggingStreamHandler(logging_StreamHandler):
def __init__(
self,
terminal_reporter: TerminalReporter,
capture_manager: CaptureManager | None,
capture_manager: Optional[CaptureManager],
) -> None:
super().__init__(stream=terminal_reporter) # type: ignore[arg-type]
self.capture_manager = capture_manager
@@ -915,7 +875,7 @@ class _LiveLoggingStreamHandler(logging_StreamHandler):
"""Reset the handler; should be called before the start of each test."""
self._first_record_emitted = False
def set_when(self, when: str | None) -> None:
def set_when(self, when: Optional[str]) -> None:
"""Prepare for the given test phase (setup/call/teardown)."""
self._when = when
self._section_name_shown = False