Compare commits

..

2 Commits

Author SHA1 Message Date
a221393858 Add audit logging with weekly rotation 2026-02-08 00:54:29 +03:00
61236b9d60 Handle empty docker log periods 2026-02-08 00:54:13 +03:00
7 changed files with 150 additions and 1 deletions

View File

@@ -22,6 +22,12 @@ alerts:
smart_cooldown_sec: 21600
smart_temp_warn: 50
audit:
enabled: true
path: "/var/server-bot/audit.log"
rotate_when: "W0"
backup_count: 8
arcane:
base_url: "http://localhost:3552"
api_key: "arc_..."

View File

@@ -117,6 +117,11 @@ async def logs_options(cb: CallbackQuery):
if action == "tail":
await cb.answer("Loading logs…")
rc, out = await docker_cmd(["logs", "--tail", "80", real])
if rc != 0:
await cb.message.answer(out)
return
if not out.strip():
out = "(no logs)"
await cb.message.answer(
f"📜 **Logs: {alias}**\n```{out}```",
parse_mode="Markdown"
@@ -132,6 +137,11 @@ async def logs_options(cb: CallbackQuery):
since_ts = str(int(time.time() - seconds))
await cb.answer("Loading logs…")
rc, out = await docker_cmd(["logs", "--since", since_ts, "--tail", "200", real])
if rc != 0:
await cb.message.answer(out)
return
if not out.strip():
out = "(no logs for period)"
await cb.message.answer(
f"📜 **Logs: {alias}**\n```{out}```",
parse_mode="Markdown"

View File

@@ -102,6 +102,9 @@ async def log_filter_input(msg: Message):
if rc != 0:
await msg.answer(out, reply_markup=docker_kb)
return
if not out.strip():
await msg.answer("⚠️ Нет логов за выбранный период", reply_markup=docker_kb)
return
lines = [line for line in out.splitlines() if needle.lower() in line.lower()]
filtered = "\n".join(lines) if lines else "(no matches)"

View File

@@ -13,6 +13,7 @@ from services.npmplus import fetch_certificates, format_certificates
import state
from state import UPDATES_CACHE, REBOOT_PENDING
from services.metrics import summarize
from services.audit import read_audit_tail
@dp.message(F.text == "💽 Disks")
@@ -68,6 +69,17 @@ async def metrics(msg: Message):
await msg.answer(summarize(state.METRICS_STORE, minutes=15), reply_markup=system_kb)
@dp.message(F.text == "🧾 Audit")
async def audit_log(msg: Message):
if not is_admin_msg(msg):
return
text = read_audit_tail(cfg, limit=200)
if text.startswith("⚠️") or text.startswith(""):
await msg.answer(text, reply_markup=system_kb)
else:
await msg.answer(text, reply_markup=system_kb, parse_mode="Markdown")
@dp.message(F.text == "🔒 SSL")
async def ssl_certs(msg: Message):
if not is_admin_msg(msg):

View File

@@ -54,7 +54,7 @@ artifacts_kb = ReplyKeyboardMarkup(
system_kb = ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text="💽 Disks"), KeyboardButton(text="🔐 Security")],
[KeyboardButton(text="💽 Disks"), KeyboardButton(text="🔐 Security"), KeyboardButton(text="🧾 Audit")],
[KeyboardButton(text="🌐 URLs"), KeyboardButton(text="📈 Metrics"), KeyboardButton(text="🔒 SSL")],
[KeyboardButton(text="📦 Updates"), KeyboardButton(text="⬆️ Upgrade")],
[KeyboardButton(text="🧱 Hardware"), KeyboardButton(text="🔄 Reboot"), KeyboardButton(text="⬅️ Назад")],

View File

@@ -8,6 +8,7 @@ from services.alerts import monitor_resources, monitor_smart
from services.metrics import MetricsStore, start_sampler
from services.queue import worker as queue_worker
from services.notify import notify
from services.audit import AuditMiddleware
import state
import handlers.menu
import handlers.status
@@ -29,6 +30,7 @@ async def notify_start():
async def main():
dp.update.outer_middleware(AuditMiddleware(cfg))
state.DOCKER_MAP.clear()
state.DOCKER_MAP.update(await discover_containers(cfg))
if cfg.get("docker", {}).get("watchdog", True):

116
services/audit.py Normal file
View File

@@ -0,0 +1,116 @@
import logging
import os
from collections import deque
from datetime import datetime, timezone
from logging.handlers import TimedRotatingFileHandler
from typing import Any, Optional
from aiogram import BaseMiddleware
from aiogram.types import CallbackQuery, Message
def _get_audit_path(cfg: dict[str, Any]) -> str:
return cfg.get("audit", {}).get("path", "/var/server-bot/audit.log")
def get_audit_logger(cfg: dict[str, Any]) -> logging.Logger:
logger = logging.getLogger("audit")
if logger.handlers:
return logger
path = _get_audit_path(cfg)
os.makedirs(os.path.dirname(path), exist_ok=True)
rotate_when = cfg.get("audit", {}).get("rotate_when", "W0")
backup_count = int(cfg.get("audit", {}).get("backup_count", 8))
handler = TimedRotatingFileHandler(
path,
when=rotate_when,
interval=1,
backupCount=backup_count,
encoding="utf-8",
utc=True,
)
formatter = logging.Formatter("%(asctime)s\t%(message)s")
handler.setFormatter(formatter)
logger.setLevel(logging.INFO)
logger.addHandler(handler)
logger.propagate = False
return logger
def _user_label(message: Message | CallbackQuery) -> str:
user = message.from_user
if not user:
return "unknown"
parts = [user.username, user.first_name, user.last_name]
label = " ".join(p for p in parts if p)
return label or str(user.id)
def _normalize_action(text: str, limit: int = 200) -> str:
cleaned = " ".join(text.split())
if len(cleaned) > limit:
return cleaned[:limit] + ""
return cleaned
class AuditMiddleware(BaseMiddleware):
def __init__(self, cfg: dict[str, Any]) -> None:
self.cfg = cfg
self.logger = get_audit_logger(cfg)
async def __call__(self, handler, event, data):
if not self.cfg.get("audit", {}).get("enabled", True):
return await handler(event, data)
action: Optional[str] = None
if isinstance(event, Message):
if event.text:
action = _normalize_action(event.text)
elif event.caption:
action = _normalize_action(event.caption)
else:
action = f"<{event.content_type}>"
elif isinstance(event, CallbackQuery):
if event.data:
action = _normalize_action(f"callback:{event.data}")
else:
action = "callback:<empty>"
if action:
chat_id = event.chat.id if isinstance(event, Message) else event.message.chat.id
user_id = event.from_user.id if event.from_user else "unknown"
label = _user_label(event)
self.logger.info(
"user_id=%s\tuser=%s\tchat_id=%s\taction=%s",
user_id,
label,
chat_id,
action,
)
return await handler(event, data)
def read_audit_tail(cfg: dict[str, Any], limit: int = 200) -> str:
path = _get_audit_path(cfg)
if not os.path.exists(path):
return "⚠️ Audit log not found"
lines = deque(maxlen=limit)
with open(path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
lines.append(line.rstrip())
if not lines:
return " Audit log is empty"
header = f"🧾 Audit log ({datetime.now(timezone.utc):%Y-%m-%d %H:%M UTC})"
body = "\n".join(lines)
max_body = 3500
if len(body) > max_body:
body = body[-max_body:]
body = "...(truncated)\n" + body
return f"{header}\n```\n{body}\n```"