From b0a44136715f8b2027f32ade0fcc11d4cdc0fbac Mon Sep 17 00:00:00 2001 From: benya Date: Mon, 9 Feb 2026 01:14:37 +0300 Subject: [PATCH] Add runtime state, auto-mute schedules, and backup retries --- CONFIG.en.md | 5 +++ CONFIG.md | 5 +++ app.py | 7 ++-- config.example.yaml | 6 ++++ handlers/backup.py | 68 +++++++++++++++++++++++++++++++++++++-- handlers/help.py | 8 +++-- handlers/status.py | 42 ++++++++++++++++++++++++ handlers/system.py | 16 +++++++++ keyboards.py | 3 +- services/alert_mute.py | 67 +++++++++++++++++++++++++++++++++----- services/notify.py | 4 ++- services/openwrt.py | 2 ++ services/queue.py | 44 +++++++++++++++++++++++++ services/runtime_state.py | 52 ++++++++++++++++++++++++++++++ 14 files changed, 312 insertions(+), 17 deletions(-) create mode 100644 services/runtime_state.py diff --git a/CONFIG.en.md b/CONFIG.en.md index 22a1064..3ee742d 100644 --- a/CONFIG.en.md +++ b/CONFIG.en.md @@ -11,6 +11,7 @@ This project uses `config.yaml`. Start from `config.example.yaml`. ## paths - `artifact_state` (string): JSON file for artifact state. +- `runtime_state` (string): File for runtime state (mutes, metrics, etc.). - `restic_env` (string): Path to a file with RESTIC_* environment variables. ## thresholds @@ -31,6 +32,10 @@ This project uses `config.yaml`. Start from `config.example.yaml`. - `start` (string): Start time `HH:MM` (e.g. `23:00`). - `end` (string): End time `HH:MM` (e.g. `08:00`). - `allow_critical` (bool): Allow critical alerts during quiet hours. +- `auto_mute` (list): Per-category auto mutes by time window. + - `category` (string): load/disk/smart/ssl/docker/test. + - `start` (string): Start `HH:MM`. + - `end` (string): End `HH:MM` (can wrap over midnight). - `notify_recovery` (bool): Send recovery notifications. - `smart_enabled` (bool): Enable SMART health polling. - `smart_interval_sec` (int): SMART poll interval. diff --git a/CONFIG.md b/CONFIG.md index f64054f..4cd87e8 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -11,6 +11,7 @@ ## paths - `artifact_state` (string): JSON файл состояния артефактов. +- `runtime_state` (string): файл с runtime-состоянием (мьюты, метрики и т.п.). - `restic_env` (string): путь к файлу с RESTIC_* переменными. ## thresholds @@ -31,6 +32,10 @@ - `start` (string): начало, формат `HH:MM` (например `23:00`). - `end` (string): конец, формат `HH:MM` (например `08:00`). - `allow_critical` (bool): слать критичные алерты в тишину. +- `auto_mute` (list): авто‑мьюты по категориям и времени. + - `category` (string): load/disk/smart/ssl/docker/test. + - `start` (string): начало `HH:MM`. + - `end` (string): конец `HH:MM` (интервал может пересекать ночь). - `notify_recovery` (bool): уведомлять о восстановлении. - `smart_enabled` (bool): SMART проверки. - `smart_interval_sec` (int): интервал SMART. diff --git a/app.py b/app.py index 61f650c..758afcf 100644 --- a/app.py +++ b/app.py @@ -1,5 +1,6 @@ from aiogram import Bot, Dispatcher from config import load_cfg, load_env +from services import runtime_state cfg = load_cfg() @@ -12,8 +13,10 @@ else: ADMIN_ID = int(cfg["telegram"]["admin_id"]) ADMIN_IDS = [ADMIN_ID] -ARTIFACT_STATE = cfg["paths"]["artifact_state"] -RESTIC_ENV = load_env(cfg["paths"].get("restic_env", "/etc/restic/restic.env")) +paths_cfg = cfg.get("paths", {}) +runtime_state.configure(paths_cfg.get("runtime_state", "/var/server-bot/runtime.json")) +ARTIFACT_STATE = paths_cfg["artifact_state"] +RESTIC_ENV = load_env(paths_cfg.get("restic_env", "/etc/restic/restic.env")) DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80)) LOAD_WARN = float(cfg.get("thresholds", {}).get("load_warn", 2.0)) diff --git a/config.example.yaml b/config.example.yaml index c42deac..67980bd 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -8,6 +8,7 @@ telegram: paths: # JSON state file for artifacts artifact_state: "/opt/tg-bot/state.json" + runtime_state: "/var/server-bot/runtime.json" # Optional env file with RESTIC_* variables restic_env: "/etc/restic/restic.env" @@ -24,6 +25,11 @@ alerts: notify_cooldown_sec: 900 # If true, only critical load alerts are sent (no warn/OK) load_only_critical: false + # Optional auto-mute windows per category + auto_mute: + - category: "load" + start: "23:00" + end: "08:00" quiet_hours: enabled: false start: "23:00" diff --git a/handlers/backup.py b/handlers/backup.py index 87333cd..67a7e33 100644 --- a/handlers/backup.py +++ b/handlers/backup.py @@ -63,6 +63,17 @@ def _format_backup_result(rc: int, out: str) -> str: return base +def _tail(path: str, lines: int = 120) -> str: + if not os.path.exists(path): + return f"⚠️ Log not found: {path}" + try: + with open(path, "r", encoding="utf-8", errors="replace") as f: + data = f.readlines()[-lines:] + except Exception as e: + return f"⚠️ Failed to read log: {e}" + return "".join(data).strip() or "(empty)" + + def _load_json(raw: str, label: str) -> tuple[bool, object | None, str]: if not raw or not raw.strip(): return False, None, f"? {label} returned empty output" @@ -225,6 +236,10 @@ async def cmd_backup_status(msg: Message): async def cmd_backup_now(msg: Message): + await schedule_backup(msg) + + +async def schedule_backup(msg: Message): async def job(): if cfg.get("safety", {}).get("dry_run", False): await msg.answer("🧪 Dry-run: backup skipped", reply_markup=backup_kb) @@ -241,7 +256,14 @@ async def cmd_backup_now(msg: Message): use_restic_env=True, timeout=6 * 3600, ) - await msg.answer(_format_backup_result(rc, out), reply_markup=backup_kb) + kb = backup_kb + if rc != 0: + kb = InlineKeyboardMarkup( + inline_keyboard=[ + [InlineKeyboardButton(text="🔁 Retry backup", callback_data="backup:retry")] + ] + ) + await msg.answer(_format_backup_result(rc, out), reply_markup=kb) finally: release_lock("backup") @@ -352,7 +374,7 @@ async def br(msg: Message): @dp.message(F.text == "/backup_run") async def br_cmd(msg: Message): if is_admin_msg(msg): - await cmd_backup_now(msg) + await schedule_backup(msg) @dp.message(F.text == "🧪 Restic check") @@ -367,7 +389,14 @@ async def rc(msg: Message): use_restic_env=True, timeout=6 * 3600, ) - await msg.answer(("✅ OK\n" if rc2 == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb) + kb = backup_kb + if rc2 != 0: + kb = InlineKeyboardMarkup( + inline_keyboard=[ + [InlineKeyboardButton(text="🔁 Retry restic check", callback_data="backup:retry_check")] + ] + ) + await msg.answer(("✅ OK\n" if rc2 == 0 else "❌ FAIL\n") + out, reply_markup=kb) pos = await enqueue("restic-check", job) await msg.answer(f"🕓 Restic check queued (#{pos})", reply_markup=backup_kb) @@ -395,3 +424,36 @@ async def wr(msg: Message): async def rh(msg: Message): if is_admin_msg(msg): await msg.answer(restore_help(), reply_markup=backup_kb) + + +@dp.message(F.text == "📜 History") +@dp.message(F.text == "/backup_history") +async def backup_history(msg: Message): + if not is_admin_msg(msg): + return + log_path = "/var/log/backup-auto.log" + content = _tail(log_path, lines=160) + if content.startswith("⚠️"): + await msg.answer(content, reply_markup=backup_kb) + return + await msg.answer( + f"📜 Backup history (tail)\n`{log_path}`\n```\n{content}\n```", + reply_markup=backup_kb, + parse_mode="Markdown", + ) + + +@dp.callback_query(F.data == "backup:retry") +async def backup_retry(cb: CallbackQuery): + if not is_admin_cb(cb): + return + await cb.answer("Queuing backup…") + await schedule_backup(cb.message) + + +@dp.callback_query(F.data == "backup:retry_check") +async def backup_retry_check(cb: CallbackQuery): + if not is_admin_cb(cb): + return + await cb.answer("Queuing restic check…") + await rc(cb.message) diff --git a/handlers/help.py b/handlers/help.py index 1a3ba1f..d03087c 100644 --- a/handlers/help.py +++ b/handlers/help.py @@ -13,6 +13,7 @@ HELP_PAGES = [ "📊 *Статус* — общая загрузка.\n" "📋 */status_short* — кратко (load/RAM/диски).\n" "🩺 */health_short* — краткий health.\n" + "🧪 */selftest* — health + restic snapshot probe.\n" "🔧 Разделы: Docker, Backup, Artifacts, System, OpenWrt.", ), ( @@ -25,13 +26,16 @@ HELP_PAGES = [ "Шорткаты: `/alerts_list`, `/alerts_recent`, `/alerts_mute_load` (60м).\n" "Категории: load, disk, smart, ssl, docker, test.\n" "Quiet hours: `alerts.quiet_hours` для не‑критичных.\n" + "Авто-мьют: `alerts.auto_mute` со слотами времени.\n" "Только красные load: `alerts.load_only_critical: true`.\n" "Валидатор конфига: `/config_check`.", ), ( "Backup", "💾 **Backup (restic)**\n\n" - "Кнопки: Status, Last snapshot, Repo stats, Run backup, Queue, Restic check, Weekly report.\n" + "Кнопки: Status, Last snapshot, Repo stats, Run backup, Queue, Restic check, Weekly report, History.\n" + "History — хвост `/var/log/backup-auto.log`.\n" + "Fail → кнопка Retry (backup/check).\n" "Run backup/Check учитывают `safety.dry_run`.\n" "После бэкапа приходит TL;DR + путь к логу `/var/log/backup-auto.log`.\n" "Queue → Details показывает отложенные задачи.", @@ -45,7 +49,7 @@ HELP_PAGES = [ "Info: Disks/Security/Metrics/Hardware/SMART/OpenWrt.\n" "Ops: Updates/Upgrade/Reboot.\n" "Logs: Audit/Incidents/Security/Integrations/Processes.\n" - "OpenWrt: `/openwrt`, `/openwrt_wan`, `/openwrt_clients`.", + "OpenWrt: `/openwrt`, `/openwrt_wan`, `/openwrt_clients`, `/openwrt_leases`.", ), ( "Admin", diff --git a/handlers/status.py b/handlers/status.py index e2a967d..35c1ddb 100644 --- a/handlers/status.py +++ b/handlers/status.py @@ -1,4 +1,5 @@ import asyncio +import json import socket import time import psutil @@ -10,6 +11,7 @@ from keyboards import menu_kb from services.system import format_disks from services.health import health from state import DOCKER_MAP +from services.runner import run_cmd_full async def cmd_status(msg: Message): @@ -115,6 +117,46 @@ async def health_short(msg: Message): await msg.answer(f"🩺 Health (short)\n{brief}", reply_markup=menu_kb) +@dp.message(F.text.in_({"🧪 Self-test", "/selftest"})) +async def selftest(msg: Message): + if not is_admin_msg(msg): + return + + await msg.answer("⏳ Self-test…", reply_markup=menu_kb) + + async def worker(): + lines = ["🧪 Self-test"] + # health + try: + htext = await asyncio.to_thread(health, cfg, DOCKER_MAP) + h_lines = [ln for ln in htext.splitlines() if ln.strip()] + brief = " | ".join(h_lines[1:5]) if len(h_lines) > 1 else h_lines[0] if h_lines else "n/a" + lines.append(f"🟢 Health: {brief}") + except Exception as e: + lines.append(f"🔴 Health failed: {e}") + + # restic snapshots check + rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40) + if rc == 0: + try: + snaps = json.loads(out) + if isinstance(snaps, list) and snaps: + snaps.sort(key=lambda s: s.get("time", ""), reverse=True) + last = snaps[0] + t = last.get("time", "?").replace("Z", "").replace("T", " ")[:16] + lines.append(f"🟢 Restic snapshots: {len(snaps)}, last {t}") + else: + lines.append("🟡 Restic snapshots: empty") + except Exception: + lines.append("🟡 Restic snapshots: invalid JSON") + else: + lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}") + + await msg.answer("\n".join(lines), reply_markup=menu_kb) + + asyncio.create_task(worker()) + + def _rate_str(value: float) -> str: if value >= 1024 * 1024: return f"{value / (1024 * 1024):.2f} MiB/s" diff --git a/handlers/system.py b/handlers/system.py index cfc907d..eef37dc 100644 --- a/handlers/system.py +++ b/handlers/system.py @@ -251,6 +251,22 @@ async def openwrt_clients(msg: Message): asyncio.create_task(worker()) +@dp.message(F.text == "/openwrt_leases") +async def openwrt_leases(msg: Message): + if not is_admin_msg(msg): + return + await msg.answer("⏳ Checking OpenWrt leases…", reply_markup=system_info_kb) + + async def worker(): + try: + text = await get_openwrt_status(cfg, mode="leases") + except Exception as e: + text = f"⚠️ OpenWrt error: {e}" + await msg.answer(text, reply_markup=system_info_kb) + + asyncio.create_task(worker()) + + @dp.message(F.text == "🧾 Audit") async def audit_log(msg: Message): if not is_admin_msg(msg): diff --git a/keyboards.py b/keyboards.py index 99142df..c199bb1 100644 --- a/keyboards.py +++ b/keyboards.py @@ -38,7 +38,8 @@ backup_kb = ReplyKeyboardMarkup( [KeyboardButton(text="📦 Status"), KeyboardButton(text="📦 Last snapshot")], [KeyboardButton(text="📊 Repo stats"), KeyboardButton(text="🧯 Restore help")], [KeyboardButton(text="▶️ Run backup"), KeyboardButton(text="🧾 Queue")], - [KeyboardButton(text="🧪 Restic check"), KeyboardButton(text="📬 Weekly report"), KeyboardButton(text="⬅️ Назад")], + [KeyboardButton(text="🧪 Restic check"), KeyboardButton(text="📬 Weekly report"), KeyboardButton(text="📜 History")], + [KeyboardButton(text="⬅️ Назад")], ], resize_keyboard=True, ) diff --git a/services/alert_mute.py b/services/alert_mute.py index 131c61c..cd1ac44 100644 --- a/services/alert_mute.py +++ b/services/alert_mute.py @@ -1,37 +1,53 @@ import time from typing import Dict +from services.runtime_state import get_state, set_state # category -> unix timestamp until muted -_MUTES: Dict[str, float] = {} + + +def _mutes() -> Dict[str, float]: + return get_state().get("mutes", {}) + + +def _save(mutes: Dict[str, float]): + set_state("mutes", mutes) def _cleanup() -> None: + mutes = _mutes() now = time.time() - expired = [k for k, until in _MUTES.items() if until <= now] + expired = [k for k, until in mutes.items() if until <= now] for k in expired: - _MUTES.pop(k, None) + mutes.pop(k, None) + _save(mutes) def set_mute(category: str, seconds: int) -> float: _cleanup() + mutes = _mutes() until = time.time() + max(0, seconds) - _MUTES[category] = until + mutes[category] = until + _save(mutes) return until def clear_mute(category: str) -> None: - _MUTES.pop(category, None) + mutes = _mutes() + mutes.pop(category, None) + _save(mutes) def is_muted(category: str | None) -> bool: if not category: return False _cleanup() - until = _MUTES.get(category) + mutes = _mutes() + until = mutes.get(category) if until is None: return False if until <= time.time(): - _MUTES.pop(category, None) + mutes.pop(category, None) + _save(mutes) return False return True @@ -39,4 +55,39 @@ def is_muted(category: str | None) -> bool: def list_mutes() -> dict[str, int]: _cleanup() now = time.time() - return {k: int(until - now) for k, until in _MUTES.items()} + mutes = _mutes() + return {k: int(until - now) for k, until in mutes.items()} + + +def is_auto_muted(cfg: dict, category: str | None) -> bool: + if not category: + return False + auto_list = cfg.get("alerts", {}).get("auto_mute", []) + if not isinstance(auto_list, list): + return False + now = time.localtime() + now_minutes = now.tm_hour * 60 + now.tm_min + for item in auto_list: + if not isinstance(item, dict): + continue + cat = item.get("category") + if cat != category: + continue + start = item.get("start", "00:00") + end = item.get("end", "00:00") + try: + sh, sm = [int(x) for x in start.split(":")] + eh, em = [int(x) for x in end.split(":")] + except Exception: + continue + start_min = sh * 60 + sm + end_min = eh * 60 + em + if start_min == end_min: + continue + if start_min < end_min: + if start_min <= now_minutes < end_min: + return True + else: + if now_minutes >= start_min or now_minutes < end_min: + return True + return False diff --git a/services/notify.py b/services/notify.py index a1d200c..61d7cd2 100644 --- a/services/notify.py +++ b/services/notify.py @@ -2,7 +2,7 @@ import time from datetime import datetime from aiogram import Bot from app import cfg -from services.alert_mute import is_muted +from services.alert_mute import is_muted, is_auto_muted from services.incidents import log_incident @@ -49,6 +49,8 @@ async def notify( alerts_cfg = cfg.get("alerts", {}) if category and is_muted(category): return + if category and is_auto_muted(cfg, category): + return if _in_quiet_hours(alerts_cfg): allow_critical = bool(alerts_cfg.get("quiet_hours", {}).get("allow_critical", True)) if not (allow_critical and level == "critical"): diff --git a/services/openwrt.py b/services/openwrt.py index be2218b..dfbe324 100644 --- a/services/openwrt.py +++ b/services/openwrt.py @@ -447,4 +447,6 @@ async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str: return "\n".join(header) if mode == "clients": return "\n".join(header + wifi_section) + if mode == "leases": + return "\n".join(header + lease_section) return "\n".join(header + wifi_section + lease_section) diff --git a/services/queue.py b/services/queue.py index eb89c73..d2bfb07 100644 --- a/services/queue.py +++ b/services/queue.py @@ -2,12 +2,24 @@ import asyncio import time from collections import deque from typing import Awaitable, Callable, Any +from services import runtime_state _queue: asyncio.Queue = asyncio.Queue() _current_label: str | None = None _current_meta: dict[str, Any] | None = None _pending: deque[tuple[str, float]] = deque() +_stats: dict[str, Any] = runtime_state.get("queue_stats", {}) or { + "processed": 0, + "avg_wait_sec": 0.0, + "avg_runtime_sec": 0.0, + "last_label": "", + "last_finished_at": 0.0, +} + + +def _save_stats(): + runtime_state.set_state("queue_stats", _stats) async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int: @@ -34,6 +46,21 @@ async def worker(): try: await job() finally: + finished_at = time.time() + if _current_meta: + wait_sec = max(0.0, _current_meta["started_at"] - _current_meta["enqueued_at"]) + runtime_sec = max(0.0, finished_at - _current_meta["started_at"]) + n_prev = int(_stats.get("processed", 0)) + _stats["processed"] = n_prev + 1 + _stats["avg_wait_sec"] = ( + (_stats.get("avg_wait_sec", 0.0) * n_prev) + wait_sec + ) / _stats["processed"] + _stats["avg_runtime_sec"] = ( + (_stats.get("avg_runtime_sec", 0.0) * n_prev) + runtime_sec + ) / _stats["processed"] + _stats["last_label"] = label + _stats["last_finished_at"] = finished_at + _save_stats() _current_label = None _current_meta = None _queue.task_done() @@ -47,6 +74,12 @@ def format_status() -> str: if pending: preview = ", ".join([p[0] for p in pending[:5]]) lines.append(f"➡️ Next: {preview}") + if _stats.get("processed"): + lines.append( + f"📈 Done: {_stats.get('processed')} | " + f"avg wait {int(_stats.get('avg_wait_sec', 0))}s | " + f"avg run {int(_stats.get('avg_runtime_sec', 0))}s" + ) return "\n".join(lines) @@ -67,4 +100,15 @@ def format_details(limit: int = 10) -> str: for i, (label, enqueued_at) in enumerate(pending[:limit], start=1): wait = int(now - enqueued_at) lines.append(f"{i:>3} | {label} | {wait}s") + if _stats.get("processed"): + lines.append("") + lines.append( + "📈 Stats: " + f"{_stats.get('processed')} done, " + f"avg wait {int(_stats.get('avg_wait_sec', 0))}s, " + f"avg run {int(_stats.get('avg_runtime_sec', 0))}s" + ) + last_label = _stats.get("last_label") + if last_label: + lines.append(f"Last: {last_label}") return "\n".join(lines) diff --git a/services/runtime_state.py b/services/runtime_state.py new file mode 100644 index 0000000..e4a409f --- /dev/null +++ b/services/runtime_state.py @@ -0,0 +1,52 @@ +import json +import os +from typing import Any, Dict + +_PATH = "/var/server-bot/runtime.json" +_STATE: Dict[str, Any] = {} + + +def configure(path: str | None): + global _PATH + if path: + _PATH = path + + +def _load_from_disk(): + global _STATE + if not os.path.exists(_PATH): + _STATE = {} + return + try: + with open(_PATH, "r", encoding="utf-8") as f: + _STATE = json.load(f) + except Exception: + _STATE = {} + + +def _save(): + os.makedirs(os.path.dirname(_PATH), exist_ok=True) + try: + with open(_PATH, "w", encoding="utf-8") as f: + json.dump(_STATE, f) + except Exception: + pass + + +def get_state() -> Dict[str, Any]: + if not _STATE: + _load_from_disk() + return _STATE + + +def set_state(key: str, value: Any): + if not _STATE: + _load_from_disk() + _STATE[key] = value + _save() + + +def get(key: str, default: Any = None) -> Any: + if not _STATE: + _load_from_disk() + return _STATE.get(key, default)