|
- # pretty_logger.py
-
- import json
- import logging
- import logging.handlers
- import sys
- import threading
- import time
- from contextlib import ContextDecorator
- from functools import wraps
-
- # ANSI escape codes for colors
- RESET = "\x1b[0m"
- BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = (
- "\x1b[30m",
- "\x1b[31m",
- "\x1b[32m",
- "\x1b[33m",
- "\x1b[34m",
- "\x1b[35m",
- "\x1b[36m",
- "\x1b[37m",
- )
-
- _LEVEL_COLORS = {
- logging.DEBUG: CYAN,
- logging.INFO: GREEN,
- logging.WARNING: YELLOW,
- logging.ERROR: RED,
- logging.CRITICAL: MAGENTA,
- }
-
-
- def _supports_color() -> bool:
- """
- Detect whether the running terminal supports ANSI colors.
- """
- if hasattr(sys.stdout, "isatty") and sys.stdout.isatty():
- platform = sys.platform
- if platform == "win32":
- # On Windows, modern terminals may support ANSI; assume yes for Windows 10+
- return True
- return True
- return False
-
-
- class PrettyFormatter(logging.Formatter):
- """
- A logging.Formatter that outputs colorized logs to the console
- (if enabled) and supports JSON formatting (if requested).
- """
-
- def __init__(
- self,
- fmt: str = None,
- datefmt: str = "%Y-%m-%d %H:%M:%S",
- use_colors: bool = True,
- json_output: bool = False,
- ):
- super().__init__(fmt=fmt, datefmt=datefmt)
- self.use_colors = use_colors and _supports_color()
- self.json_output = json_output
-
- def format(self, record: logging.LogRecord) -> str:
- # If JSON output is requested, dump a dict of relevant fields
- if self.json_output:
- log_record = {
- "timestamp": self.formatTime(record, self.datefmt),
- "name": record.name,
- "level": record.levelname,
- "message": record.getMessage(),
- }
- # Include extra/contextual fields
- for k, v in record.__dict__.get("extra_fields", {}).items():
- log_record[k] = v
- if record.exc_info:
- log_record["exception"] = self.formatException(record.exc_info)
- return json.dumps(log_record, ensure_ascii=False)
-
- # Otherwise, build a colorized/“pretty” line
- level_color = _LEVEL_COLORS.get(record.levelno, WHITE) if self.use_colors else ""
- reset = RESET if self.use_colors else ""
- timestamp = self.formatTime(record, self.datefmt)
- name = record.name
- level = record.levelname
-
- # Basic format: [timestamp] [LEVEL] [name]: message
- base = f"[{timestamp}] [{level}] [{name}]: {record.getMessage()}"
-
- # If there are any extra fields attached via LoggerAdapter, append them
- extra_fields = record.__dict__.get("extra_fields", {})
- if extra_fields:
- # key1=val1 key2=val2 …
- extras_str = " ".join(f"{k}={v}" for k, v in extra_fields.items())
- base = f"{base} :: {extras_str}"
-
- # If exception info is present, include traceback
- if record.exc_info:
- exc_text = self.formatException(record.exc_info)
- base = f"{base}\n{exc_text}"
-
- if self.use_colors:
- return f"{level_color}{base}{reset}"
- else:
- return base
-
-
- class JsonFormatter(PrettyFormatter):
- """
- Shortcut to force JSON formatting (ignores color flags).
- """
-
- def __init__(self, datefmt: str = "%Y-%m-%d %H:%M:%S"):
- super().__init__(fmt=None, datefmt=datefmt, use_colors=False, json_output=True)
-
-
- class ContextFilter(logging.Filter):
- """
- Inject extra_fields from a LoggerAdapter into the LogRecord, so the Formatter can find them.
- """
-
- def filter(self, record: logging.LogRecord) -> bool:
- extra = getattr(record, "extra", None)
- if isinstance(extra, dict):
- record.extra_fields = extra
- else:
- record.extra_fields = {}
- return True
-
-
- class Timer(ContextDecorator):
- """
- Context manager & decorator to measure execution time of a code block or function.
- Usage as context manager:
- with Timer(logger, "block name"):
- # code …
- Usage as decorator:
- @Timer(logger, "function foo")
- def foo(...):
- ...
- """
-
- def __init__(self, logger: logging.Logger, name: str = None, level: int = logging.INFO):
- self.logger = logger
- self.name = name or ""
- self.level = level
-
- def __enter__(self):
- self.start_time = time.time()
- return self
-
- def __exit__(self, exc_type, exc, exc_tb):
- elapsed = time.time() - self.start_time
- label = f"[Timer:{self.name}]" if self.name else "[Timer]"
- self.logger.log(self.level, f"{label} elapsed {elapsed:.4f} sec")
- return False # do not suppress exceptions
-
-
- def log_function(logger: logging.Logger, level: int = logging.DEBUG):
- """
- Decorator factory to log function entry/exit and execution time.
- Usage:
- @log_function(logger, level=logging.INFO)
- def my_func(...):
- ...
- """
-
- def decorator(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- logger.log(level, f"➤ Entering {func.__name__}()")
- start = time.time()
- try:
- result = func(*args, **kwargs)
- except Exception:
- logger.exception(f"✖ Exception in {func.__name__}()")
- raise
- elapsed = time.time() - start
- logger.log(level, f"✔ Exiting {func.__name__}(), elapsed {elapsed:.4f} sec")
- return result
-
- return wrapper
-
- return decorator
-
-
- class PrettyLogger:
- """
- Encapsulates a logging.Logger with “pretty” formatting, colorized console output,
- rotating file handler, JSON option, contextual fields, etc.
- """
-
- _instances = {}
- _lock = threading.Lock()
-
- def __new__(cls, name: str = "root", **kwargs):
- """
- Implements a simple singleton: multiple calls with the same name return the same instance.
- """
- with cls._lock:
- if name not in cls._instances:
- instance = super().__new__(cls)
- cls._instances[name] = instance
- return cls._instances[name]
-
- def __init__(
- self,
- name: str = "root",
- level: int = logging.DEBUG,
- log_to_console: bool = True,
- console_level: int = logging.DEBUG,
- log_to_file: bool = False,
- log_file: str = "app.log",
- file_level: int = logging.INFO,
- max_bytes: int = 10 * 1024 * 1024, # 10 MB
- backup_count: int = 5,
- formatter: PrettyFormatter = None,
- json_output: bool = False,
- ):
- """
- Initialize (or re‐configure) a PrettyLogger.
-
- Args:
- name: logger name.
- level: root level for the logger (lowest level that will be processed).
- log_to_console: whether to attach a console (StreamHandler).
- console_level: level for console output.
- log_to_file: whether to attach a rotating file handler.
- log_file: path to the log file.
- file_level: level for file output.
- max_bytes: rotation threshold in bytes.
- backup_count: number of backup files to keep.
- formatter: an instance of PrettyFormatter. If None, a default is created.
- json_output: if True, forces JSON formatting on all handlers.
- """
- # Avoid re‐initializing if already done
- logger = logging.getLogger(name)
- if getattr(logger, "_pretty_logger_inited", False):
- return
-
- self.logger = logger
- self.logger.setLevel(level)
- self.logger.propagate = False # avoid double‐logging if root logger is configured elsewhere
-
- # Create a default formatter if not supplied
- if formatter is None:
- fmt = "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
- formatter = PrettyFormatter(
- fmt=fmt, use_colors=not json_output, json_output=json_output
- )
-
- # Add ContextFilter so extra_fields is always present
- context_filter = ContextFilter()
- self.logger.addFilter(context_filter)
-
- # Console handler
- if log_to_console:
- ch = logging.StreamHandler(sys.stdout)
- ch.setLevel(console_level)
- ch.setFormatter(formatter)
- self.logger.addHandler(ch)
-
- # Rotating file handler
- if log_to_file:
- fh = logging.handlers.RotatingFileHandler(
- filename=log_file,
- maxBytes=max_bytes,
- backupCount=backup_count,
- encoding="utf-8",
- )
- fh.setLevel(file_level)
- # For file, typically we don’t colorize
- file_formatter = (
- JsonFormatter(datefmt="%Y-%m-%d %H:%M:%S")
- if json_output
- else PrettyFormatter(fmt=fmt, use_colors=False, json_output=False)
- )
- fh.setFormatter(file_formatter)
- self.logger.addHandler(fh)
-
- # Mark as initialized
- setattr(self.logger, "_pretty_logger_inited", True)
-
- def addHandler(self, handler: logging.Handler):
- """
- Add a custom handler to the logger.
- This can be used to add any logging.Handler instance.
- """
- self.logger.addHandler(handler)
-
- def bind(self, **kwargs):
- """
- Return a LoggerAdapter that automatically injects extra fields into all log records.
- Example:
- ctx_logger = pretty_logger.bind(request_id=123, user="alice")
- ctx_logger.info("Something happened") # will include request_id and user in the output
- """
- return logging.LoggerAdapter(self.logger, {"extra": kwargs})
-
- def add_stream_handler(
- self, stream=None, level: int = logging.DEBUG, formatter: PrettyFormatter = None
- ):
- """
- Add an additional stream handler (e.g. to stderr).
- """
- handler = logging.StreamHandler(stream or sys.stdout)
- handler.setLevel(level)
- if formatter is None:
- fmt = "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
- formatter = PrettyFormatter(fmt=fmt)
- handler.setFormatter(formatter)
- self.logger.addHandler(handler)
- return handler
-
- def add_rotating_file_handler(
- self,
- log_file: str,
- level: int = logging.INFO,
- max_bytes: int = 10 * 1024 * 1024,
- backup_count: int = 5,
- json_output: bool = False,
- ):
- """
- Add another rotating file handler at runtime.
- """
- fh = logging.handlers.RotatingFileHandler(
- filename=log_file,
- maxBytes=max_bytes,
- backupCount=backup_count,
- encoding="utf-8",
- )
- fh.setLevel(level)
- if json_output:
- formatter = JsonFormatter()
- else:
- fmt = "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
- formatter = PrettyFormatter(fmt=fmt, use_colors=False, json_output=False)
- fh.setFormatter(formatter)
- self.logger.addHandler(fh)
- return fh
-
- def remove_handler(self, handler: logging.Handler):
- """
- Remove a handler from the logger.
- """
- self.logger.removeHandler(handler)
-
- def set_level(self, level: int):
- """
- Change the logger’s root level at runtime.
- """
- self.logger.setLevel(level)
-
- def get_logger(self) -> logging.Logger:
- return self.logger
-
- # Convenience methods to expose common logging calls directly from this object:
-
- def debug(self, msg, *args, **kwargs):
- self.logger.debug(msg, *args, **kwargs)
-
- def info(self, msg, *args, **kwargs):
- self.logger.info(msg, *args, **kwargs)
-
- def warning(self, msg, *args, **kwargs):
- self.logger.warning(msg, *args, **kwargs)
-
- def error(self, msg, *args, **kwargs):
- self.logger.error(msg, *args, **kwargs)
-
- def critical(self, msg, *args, **kwargs):
- self.logger.critical(msg, *args, **kwargs)
-
- def exception(self, msg, *args, exc_info=True, **kwargs):
- """
- Log an error message with exception traceback. By default exc_info=True.
- """
- self.logger.error(msg, *args, exc_info=exc_info, **kwargs)
-
-
- # Convenience function for a module‐level “global” pretty logger:
- _global_loggers = {}
- _global_lock = threading.Lock()
-
-
- def get_pretty_logger(
- name: str = "root",
- level: int = logging.DEBUG,
- log_to_console: bool = True,
- console_level: int = logging.DEBUG,
- log_to_file: bool = False,
- log_file: str = "app.log",
- file_level: int = logging.INFO,
- max_bytes: int = 10 * 1024 * 1024,
- backup_count: int = 5,
- json_output: bool = False,
- ) -> PrettyLogger:
- """
- Return a PrettyLogger instance for `name`, creating/configuring it on first use.
- Subsequent calls with the same `name` will return the same logger (singleton‐style).
- """
- with _global_lock:
- if name not in _global_loggers:
- pl = PrettyLogger(
- name=name,
- level=level,
- log_to_console=log_to_console,
- console_level=console_level,
- log_to_file=log_to_file,
- log_file=log_file,
- file_level=file_level,
- max_bytes=max_bytes,
- backup_count=backup_count,
- json_output=json_output,
- )
- _global_loggers[name] = pl
- return _global_loggers[name]
|