import logging import os from collections import deque from datetime import datetime, timedelta, timezone from logging.handlers import TimedRotatingFileHandler from typing import Any from services import runtime_state def _get_path(cfg: dict[str, Any]) -> str: return cfg.get("incidents", {}).get("path", "/var/server-bot/incidents.log") def incidents_path(cfg: dict[str, Any]) -> str: return _get_path(cfg) def _get_logger(cfg: dict[str, Any]) -> logging.Logger: logger = logging.getLogger("incidents") if logger.handlers: return logger path = _get_path(cfg) os.makedirs(os.path.dirname(path), exist_ok=True) rotate_when = cfg.get("incidents", {}).get("rotate_when", "W0") backup_count = int(cfg.get("incidents", {}).get("backup_count", 8)) handler = TimedRotatingFileHandler( path, when=rotate_when, interval=1, backupCount=backup_count, encoding="utf-8", utc=True, ) formatter = logging.Formatter( "%(asctime)s\t%(message)s", datefmt="%Y-%m-%dT%H:%M:%SZ", ) handler.setFormatter(formatter) logger.setLevel(logging.INFO) logger.addHandler(handler) logger.propagate = False return logger def log_incident(cfg: dict[str, Any], text: str, category: str | None = None) -> None: if not cfg.get("incidents", {}).get("enabled", True): return if category and "category=" not in text: text = f"category={category} {text}" logger = _get_logger(cfg) logger.info(text) def _parse_line(line: str) -> tuple[datetime | None, str]: if "\t" not in line: return None, line.strip() ts, msg = line.split("\t", 1) try: dt = datetime.strptime(ts.strip(), "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) except ValueError: dt = None return dt, msg.strip() def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]: return [f"{dt:%Y-%m-%d %H:%M} {msg}" for dt, msg in read_raw(cfg, hours, limit=limit)] def read_raw(cfg: dict[str, Any], hours: int, limit: int = 200, *, include_old: bool = False) -> list[tuple[datetime, str]]: path = _get_path(cfg) if not os.path.exists(path): return [] since = datetime.now(timezone.utc) - timedelta(hours=hours) lines = deque(maxlen=limit) with open(path, "r", encoding="utf-8", errors="replace") as f: for line in f: dt, msg = _parse_line(line.rstrip()) if dt is None: continue if not include_old and dt < since: continue lines.append((dt, msg)) return list(lines) def infer_category(text: str) -> str | None: lower = text.lower() if "category=" in lower: import re m = re.search(r"category=([a-z0-9_-]+)", lower) if m: return m.group(1) if "load" in lower: return "load" if "docker" in lower: return "docker" if "restic" in lower or "backup" in lower: return "backup" if "smart" in lower: return "smart" if "ssl" in lower or "cert" in lower: return "ssl" if "npmplus" in lower: return "npmplus" if "gitea" in lower: return "gitea" if "openwrt" in lower: return "openwrt" if "queue" in lower: return "queue" if "selftest" in lower: return "selftest" return None