Source code for flexget.log

from __future__ import annotations

import contextlib
import functools
import logging
import os
import sys
import threading
import uuid
import warnings
from collections import deque
from typing import TYPE_CHECKING

from loguru import logger

from flexget import __version__

if TYPE_CHECKING:
    from collections.abc import Callable, Iterator

    import loguru

# A level more detailed than INFO
VERBOSE = 15
# environment variables to modify rotating log parameters from defaults of 1 MB and 9 files
ENV_MAXBYTES = 'FLEXGET_LOG_MAXBYTES'
ENV_MAXCOUNT = 'FLEXGET_LOG_MAXCOUNT'

LOG_FORMAT = (
    '<green>{time:YYYY-MM-DD HH:mm:ss}</green> <level>{level: <8}</level> '
    '<cyan>{name: <13}</cyan> <bold>{extra[task]: <15}</bold> {message}'
)

# Stores current `session_id` to keep track of originating thread for log calls
local_context = threading.local()


[docs] @contextlib.contextmanager def capture_logs(*args, **kwargs) -> Iterator: """Take the same arguments as `logger.add`, but this sync will only log messages contained in context.""" old_id = get_log_session_id() session_id = local_context.session_id = old_id or str(uuid.uuid4()) existing_filter = kwargs.pop('filter', None) kwargs.setdefault('format', LOG_FORMAT) def filter_func(record): if record['extra'].get('session_id') != session_id: return False if existing_filter: return existing_filter(record) return True kwargs['filter'] = filter_func log_sink = logger.add(*args, **kwargs) try: with logger.contextualize(session_id=session_id): yield finally: local_context.session_id = old_id logger.remove(log_sink)
[docs] def get_log_session_id() -> str: return getattr(local_context, 'session_id', None)
[docs] def record_patcher(record: loguru.Record) -> None: # If a custom name was bound to the logger, move it from extra directly into the record name = record['extra'].pop('name', None) if name: record['name'] = name
[docs] class InterceptHandler(logging.Handler): """Catch any stdlib log messages from our deps and propagate to loguru."""
[docs] def emit(self, record: logging.LogRecord): # Get corresponding Loguru level if it exists level: str | int try: level = logger.level(record.levelname).name except ValueError: level = record.levelno # Find caller from where originated the logged message frame, depth = logging.currentframe(), 2 while frame.f_code.co_filename == logging.__file__: frame = frame.f_back depth += 1 logger.bind(name=record.name).opt(depth=depth, exception=record.exc_info).log( level, record.getMessage() )
_logging_configured = False _startup_buffer: list[loguru.Record] = [] _startup_buffer_id: int | None = None _logging_started = False # Stores the last 100 debug messages debug_buffer: deque[loguru.Message] = deque(maxlen=100) _log_filters = [] # Stores filter functions
[docs] def _log_filterer(record): """Add the function to our loguru handlers. It will dynamically use all filters we add later.""" return all(f(record) for f in _log_filters)
[docs] def add_filter(func: Callable[[loguru.Record], bool]): """Add a filter function to the log handlers.""" _log_filters.append(func)
[docs] def remove_filter(func: Callable[[loguru.Record], bool]): """Remove a filter function from the log handlers.""" _log_filters.remove(func)
[docs] def initialize(unit_test: bool = False) -> None: """Prepare logging.""" # Remove default loguru sinks logger.remove() global _logging_configured, _logging_started if _logging_configured: return if 'dev' in __version__: warnings.filterwarnings('always', category=DeprecationWarning, module='flexget.*') warnings.simplefilter('once', append=True) logger.level('VERBOSE', no=VERBOSE, color='<bold>', icon='👄') logger.__class__.verbose = functools.partialmethod(logger.__class__.log, 'VERBOSE') logger.configure(extra={'task': '', 'session_id': None}, patcher=record_patcher) _logging_configured = True # with unit test we want pytest to add the handlers if unit_test: _logging_started = True return # Store any log messages in a buffer until we `start` function is run global _startup_buffer_id _startup_buffer_id = logger.add( lambda message: _startup_buffer.append(message.record), level='DEBUG', format=LOG_FORMAT ) # Add a handler that sores the last 100 debug lines to `debug_buffer` for use in crash reports logger.add( debug_buffer.append, level='DEBUG', format=LOG_FORMAT, backtrace=True, diagnose=True, ) std_logger = logging.getLogger() std_logger.addHandler(InterceptHandler())
[docs] def start( filename: str | None = None, level: str = 'INFO', to_console: bool = True, to_file: bool = True, ) -> None: """After initialization, start file logging.""" global _logging_started, _startup_buffer, _startup_buffer_id assert _logging_configured if _logging_started: return if level == 'NONE': return # Make sure stdlib logger is set so that dependency logging gets propagated logging.getLogger().setLevel(logger.level(level).no) if to_file and filename: logger.add( filename, level=level, rotation=int(os.environ.get(ENV_MAXBYTES, str(1000 * 1024))), retention=int(os.environ.get(ENV_MAXCOUNT, str(9))), encoding='utf-8', format=LOG_FORMAT, filter=_log_filterer, ) # without --cron we log to console if to_console: if not sys.stdout: logger.debug("No sys.stdout, can't log to console.") else: # Make sure we don't send any characters that the current terminal doesn't support printing sys.stdout.reconfigure(errors='replace') logger.add(sys.stdout, level=level, format=LOG_FORMAT, filter=_log_filterer) if _startup_buffer_id is not None: logger.remove(_startup_buffer_id) _startup_buffer_id = None # flush what we have stored from the plugin initialization if _startup_buffer: for record in _startup_buffer: level, message = record['level'].name, record['message'] logger.patch(lambda r, record=record: r.update(record)).log(level, message) _startup_buffer = [] _logging_started = True