119 lines
3.4 KiB
Python
119 lines
3.4 KiB
Python
import logging
|
|
import os
|
|
from collections import deque
|
|
from datetime import datetime, timedelta, timezone
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
from typing import Any
|
|
from services import runtime_state
|
|
|
|
|
|
def _get_path(cfg: dict[str, Any]) -> str:
|
|
return cfg.get("incidents", {}).get("path", "/var/server-bot/incidents.log")
|
|
|
|
|
|
def incidents_path(cfg: dict[str, Any]) -> str:
|
|
return _get_path(cfg)
|
|
|
|
|
|
def _get_logger(cfg: dict[str, Any]) -> logging.Logger:
|
|
logger = logging.getLogger("incidents")
|
|
if logger.handlers:
|
|
return logger
|
|
|
|
path = _get_path(cfg)
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
|
|
rotate_when = cfg.get("incidents", {}).get("rotate_when", "W0")
|
|
backup_count = int(cfg.get("incidents", {}).get("backup_count", 8))
|
|
handler = TimedRotatingFileHandler(
|
|
path,
|
|
when=rotate_when,
|
|
interval=1,
|
|
backupCount=backup_count,
|
|
encoding="utf-8",
|
|
utc=True,
|
|
)
|
|
formatter = logging.Formatter(
|
|
"%(asctime)s\t%(message)s",
|
|
datefmt="%Y-%m-%dT%H:%M:%SZ",
|
|
)
|
|
handler.setFormatter(formatter)
|
|
|
|
logger.setLevel(logging.INFO)
|
|
logger.addHandler(handler)
|
|
logger.propagate = False
|
|
return logger
|
|
|
|
|
|
def log_incident(cfg: dict[str, Any], text: str, category: str | None = None) -> None:
|
|
if not cfg.get("incidents", {}).get("enabled", True):
|
|
return
|
|
if category and "category=" not in text:
|
|
text = f"category={category} {text}"
|
|
logger = _get_logger(cfg)
|
|
logger.info(text)
|
|
|
|
|
|
def _parse_line(line: str) -> tuple[datetime | None, str]:
|
|
if "\t" not in line:
|
|
return None, line.strip()
|
|
ts, msg = line.split("\t", 1)
|
|
try:
|
|
dt = datetime.strptime(ts.strip(), "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
dt = None
|
|
return dt, msg.strip()
|
|
|
|
|
|
def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]:
|
|
return [f"{dt:%Y-%m-%d %H:%M} {msg}" for dt, msg in read_raw(cfg, hours, limit=limit)]
|
|
|
|
|
|
def read_raw(cfg: dict[str, Any], hours: int, limit: int = 200, *, include_old: bool = False) -> list[tuple[datetime, str]]:
|
|
path = _get_path(cfg)
|
|
if not os.path.exists(path):
|
|
return []
|
|
|
|
since = datetime.now(timezone.utc) - timedelta(hours=hours)
|
|
lines = deque(maxlen=limit)
|
|
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
|
for line in f:
|
|
dt, msg = _parse_line(line.rstrip())
|
|
if dt is None:
|
|
continue
|
|
if not include_old and dt < since:
|
|
continue
|
|
lines.append((dt, msg))
|
|
return list(lines)
|
|
|
|
|
|
def infer_category(text: str) -> str | None:
|
|
lower = text.lower()
|
|
if "category=" in lower:
|
|
import re
|
|
|
|
m = re.search(r"category=([a-z0-9_-]+)", lower)
|
|
if m:
|
|
return m.group(1)
|
|
if "load" in lower:
|
|
return "load"
|
|
if "docker" in lower:
|
|
return "docker"
|
|
if "restic" in lower or "backup" in lower:
|
|
return "backup"
|
|
if "smart" in lower:
|
|
return "smart"
|
|
if "ssl" in lower or "cert" in lower:
|
|
return "ssl"
|
|
if "npmplus" in lower:
|
|
return "npmplus"
|
|
if "gitea" in lower:
|
|
return "gitea"
|
|
if "openwrt" in lower:
|
|
return "openwrt"
|
|
if "queue" in lower:
|
|
return "queue"
|
|
if "selftest" in lower:
|
|
return "selftest"
|
|
return None
|