From a22139385877af18f8a82105ec1f57abd056d07a Mon Sep 17 00:00:00 2001 From: benya Date: Sun, 8 Feb 2026 00:54:29 +0300 Subject: [PATCH] Add audit logging with weekly rotation --- config.example.yaml | 6 +++ handlers/system.py | 12 +++++ keyboards.py | 2 +- main.py | 2 + services/audit.py | 116 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 services/audit.py diff --git a/config.example.yaml b/config.example.yaml index 5db000c..d258298 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -22,6 +22,12 @@ alerts: smart_cooldown_sec: 21600 smart_temp_warn: 50 +audit: + enabled: true + path: "/var/server-bot/audit.log" + rotate_when: "W0" + backup_count: 8 + arcane: base_url: "http://localhost:3552" api_key: "arc_..." diff --git a/handlers/system.py b/handlers/system.py index 489c726..b76236a 100644 --- a/handlers/system.py +++ b/handlers/system.py @@ -13,6 +13,7 @@ from services.npmplus import fetch_certificates, format_certificates import state from state import UPDATES_CACHE, REBOOT_PENDING from services.metrics import summarize +from services.audit import read_audit_tail @dp.message(F.text == "๐Ÿ’ฝ Disks") @@ -68,6 +69,17 @@ async def metrics(msg: Message): await msg.answer(summarize(state.METRICS_STORE, minutes=15), reply_markup=system_kb) +@dp.message(F.text == "๐Ÿงพ Audit") +async def audit_log(msg: Message): + if not is_admin_msg(msg): + return + text = read_audit_tail(cfg, limit=200) + if text.startswith("โš ๏ธ") or text.startswith("โ„น๏ธ"): + await msg.answer(text, reply_markup=system_kb) + else: + await msg.answer(text, reply_markup=system_kb, parse_mode="Markdown") + + @dp.message(F.text == "๐Ÿ”’ SSL") async def ssl_certs(msg: Message): if not is_admin_msg(msg): diff --git a/keyboards.py b/keyboards.py index f4afbb0..cb7fd3c 100644 --- a/keyboards.py +++ b/keyboards.py @@ -54,7 +54,7 @@ artifacts_kb = ReplyKeyboardMarkup( system_kb = ReplyKeyboardMarkup( keyboard=[ - [KeyboardButton(text="๐Ÿ’ฝ Disks"), KeyboardButton(text="๐Ÿ” Security")], + [KeyboardButton(text="๐Ÿ’ฝ Disks"), KeyboardButton(text="๐Ÿ” Security"), KeyboardButton(text="๐Ÿงพ Audit")], [KeyboardButton(text="๐ŸŒ URLs"), KeyboardButton(text="๐Ÿ“ˆ Metrics"), KeyboardButton(text="๐Ÿ”’ SSL")], [KeyboardButton(text="๐Ÿ“ฆ Updates"), KeyboardButton(text="โฌ†๏ธ Upgrade")], [KeyboardButton(text="๐Ÿงฑ Hardware"), KeyboardButton(text="๐Ÿ”„ Reboot"), KeyboardButton(text="โฌ…๏ธ ะะฐะทะฐะด")], diff --git a/main.py b/main.py index ad93b74..95479ec 100644 --- a/main.py +++ b/main.py @@ -8,6 +8,7 @@ from services.alerts import monitor_resources, monitor_smart from services.metrics import MetricsStore, start_sampler from services.queue import worker as queue_worker from services.notify import notify +from services.audit import AuditMiddleware import state import handlers.menu import handlers.status @@ -29,6 +30,7 @@ async def notify_start(): async def main(): + dp.update.outer_middleware(AuditMiddleware(cfg)) state.DOCKER_MAP.clear() state.DOCKER_MAP.update(await discover_containers(cfg)) if cfg.get("docker", {}).get("watchdog", True): diff --git a/services/audit.py b/services/audit.py new file mode 100644 index 0000000..e209699 --- /dev/null +++ b/services/audit.py @@ -0,0 +1,116 @@ +import logging +import os +from collections import deque +from datetime import datetime, timezone +from logging.handlers import TimedRotatingFileHandler +from typing import Any, Optional + +from aiogram import BaseMiddleware +from aiogram.types import CallbackQuery, Message + + +def _get_audit_path(cfg: dict[str, Any]) -> str: + return cfg.get("audit", {}).get("path", "/var/server-bot/audit.log") + + +def get_audit_logger(cfg: dict[str, Any]) -> logging.Logger: + logger = logging.getLogger("audit") + if logger.handlers: + return logger + + path = _get_audit_path(cfg) + os.makedirs(os.path.dirname(path), exist_ok=True) + + rotate_when = cfg.get("audit", {}).get("rotate_when", "W0") + backup_count = int(cfg.get("audit", {}).get("backup_count", 8)) + handler = TimedRotatingFileHandler( + path, + when=rotate_when, + interval=1, + backupCount=backup_count, + encoding="utf-8", + utc=True, + ) + formatter = logging.Formatter("%(asctime)s\t%(message)s") + handler.setFormatter(formatter) + + logger.setLevel(logging.INFO) + logger.addHandler(handler) + logger.propagate = False + return logger + + +def _user_label(message: Message | CallbackQuery) -> str: + user = message.from_user + if not user: + return "unknown" + parts = [user.username, user.first_name, user.last_name] + label = " ".join(p for p in parts if p) + return label or str(user.id) + + +def _normalize_action(text: str, limit: int = 200) -> str: + cleaned = " ".join(text.split()) + if len(cleaned) > limit: + return cleaned[:limit] + "โ€ฆ" + return cleaned + + +class AuditMiddleware(BaseMiddleware): + def __init__(self, cfg: dict[str, Any]) -> None: + self.cfg = cfg + self.logger = get_audit_logger(cfg) + + async def __call__(self, handler, event, data): + if not self.cfg.get("audit", {}).get("enabled", True): + return await handler(event, data) + + action: Optional[str] = None + if isinstance(event, Message): + if event.text: + action = _normalize_action(event.text) + elif event.caption: + action = _normalize_action(event.caption) + else: + action = f"<{event.content_type}>" + elif isinstance(event, CallbackQuery): + if event.data: + action = _normalize_action(f"callback:{event.data}") + else: + action = "callback:" + + if action: + chat_id = event.chat.id if isinstance(event, Message) else event.message.chat.id + user_id = event.from_user.id if event.from_user else "unknown" + label = _user_label(event) + self.logger.info( + "user_id=%s\tuser=%s\tchat_id=%s\taction=%s", + user_id, + label, + chat_id, + action, + ) + + return await handler(event, data) + + +def read_audit_tail(cfg: dict[str, Any], limit: int = 200) -> str: + path = _get_audit_path(cfg) + if not os.path.exists(path): + return "โš ๏ธ Audit log not found" + + lines = deque(maxlen=limit) + with open(path, "r", encoding="utf-8", errors="replace") as f: + for line in f: + lines.append(line.rstrip()) + + if not lines: + return "โ„น๏ธ Audit log is empty" + + header = f"๐Ÿงพ Audit log ({datetime.now(timezone.utc):%Y-%m-%d %H:%M UTC})" + body = "\n".join(lines) + max_body = 3500 + if len(body) > max_body: + body = body[-max_body:] + body = "...(truncated)\n" + body + return f"{header}\n```\n{body}\n```"