From 75113b61821f06f2ae0d74cdb9e59960fa81787d Mon Sep 17 00:00:00 2001 From: benya Date: Mon, 9 Feb 2026 01:56:27 +0300 Subject: [PATCH] Add selftest scheduler, queue history, and OpenWrt signal stats --- CONFIG.en.md | 6 ++++ CONFIG.md | 6 ++++ config.example.yaml | 7 +++++ handlers/help.py | 33 ++++++++++++++++++++++ handlers/status.py | 31 ++------------------- handlers/system.py | 44 +++++++++++++++++++++++++++++ main.py | 2 ++ services/alerts.py | 5 ++++ services/openwrt.py | 32 +++++++++++++-------- services/queue.py | 23 +++++++++++++++ services/selftest.py | 66 ++++++++++++++++++++++++++++++++++++++++++++ 11 files changed, 216 insertions(+), 39 deletions(-) create mode 100644 services/selftest.py diff --git a/CONFIG.en.md b/CONFIG.en.md index 3ee742d..e3314ad 100644 --- a/CONFIG.en.md +++ b/CONFIG.en.md @@ -36,6 +36,7 @@ This project uses `config.yaml`. Start from `config.example.yaml`. - `category` (string): load/disk/smart/ssl/docker/test. - `start` (string): Start `HH:MM`. - `end` (string): End `HH:MM` (can wrap over midnight). +- `auto_mute_on_high_load_sec` (int): auto-mute `load` category for N seconds on critical load (0 disables). - `notify_recovery` (bool): Send recovery notifications. - `smart_enabled` (bool): Enable SMART health polling. - `smart_interval_sec` (int): SMART poll interval. @@ -82,6 +83,11 @@ This project uses `config.yaml`. Start from `config.example.yaml`. - `weekly.day` (string): Weekday `Mon`..`Sun` (default `Sun`). - `weekly.time` (string): Local time `HH:MM` (default `08:00`). +## selftest + +- `schedule.enabled` (bool): Enable auto self-test. +- `schedule.time` (string): Local time `HH:MM` (default `03:30`). + ## external_checks - `enabled` (bool): Enable background checks. diff --git a/CONFIG.md b/CONFIG.md index 4cd87e8..e4ad9f6 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -36,6 +36,7 @@ - `category` (string): load/disk/smart/ssl/docker/test. - `start` (string): начало `HH:MM`. - `end` (string): конец `HH:MM` (интервал может пересекать ночь). +- `auto_mute_on_high_load_sec` (int): при critical load автоматически мьютить категорию `load` на N секунд (0 — выкл). - `notify_recovery` (bool): уведомлять о восстановлении. - `smart_enabled` (bool): SMART проверки. - `smart_interval_sec` (int): интервал SMART. @@ -82,6 +83,11 @@ - `weekly.day` (string): день недели (`Mon`..`Sun`), по умолчанию `Sun`. - `weekly.time` (string): локальное время `HH:MM`, по умолчанию `08:00`. +## selftest + +- `schedule.enabled` (bool): включить авто self-test. +- `schedule.time` (string): локальное время `HH:MM`, по умолчанию `03:30`. + ## external_checks - `enabled` (bool): включить фоновые проверки. diff --git a/config.example.yaml b/config.example.yaml index 67980bd..a35f81f 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -30,6 +30,8 @@ alerts: - category: "load" start: "23:00" end: "08:00" + # Auto-mute load when critical load fires (seconds) + auto_mute_on_high_load_sec: 600 quiet_hours: enabled: false start: "23:00" @@ -78,6 +80,11 @@ reports: day: "Sun" # Mon/Tue/Wed/Thu/Fri/Sat/Sun time: "08:00" # HH:MM server local time +selftest: + schedule: + enabled: false + time: "03:30" + external_checks: enabled: true state_path: "/var/server-bot/external_checks.json" diff --git a/handlers/help.py b/handlers/help.py index d03087c..5db9ba8 100644 --- a/handlers/help.py +++ b/handlers/help.py @@ -56,6 +56,9 @@ HELP_PAGES = [ "🛠 **Admin & Deploy**\n\n" "Config: `/config_check`, файл `config.yaml` (см. config.example.yaml).\n" "Deploy: `deploy.sh` (ssh 10.10.10.10:1090 → git pull → systemctl restart tg-bot).\n" + "Incidents summary: `/incidents_summary`.\n" + "Disk snapshot: `/disk_snapshot`.\n" + "BotFather list: `/botfather_list`.\n" "Безопасность: `safety.dry_run: true` блокирует опасные действия.\n" "OpenWrt: кнопка в System → Info.", ), @@ -110,3 +113,33 @@ async def help_cb(cb: CallbackQuery): parse_mode="Markdown", ) await cb.answer() + + +BOTFATHER_LIST = """\ +help - Show help pages +status_short - Compact host status +health_short - Compact health report +selftest - Health + restic snapshot probe +alerts - Manage alerts +alerts_list - List active mutes +alerts_recent - Show recent incidents (24h) +alerts_mute_load - Mute load alerts for 60m +backup_run - Run backup (queued) +backup_history - Show backup log tail +docker_status - Docker summary +docker_health - Docker inspect/health by alias +openwrt - Full OpenWrt status +openwrt_wan - OpenWrt WAN only +openwrt_clients - OpenWrt wifi clients +openwrt_leases - OpenWrt DHCP leases +incidents_summary - Incidents counters (24h/7d) +disk_snapshot - Disk usage snapshot +config_check - Validate config +""" + + +@dp.message(F.text == "/botfather_list") +async def botfather_list(msg: Message): + if not is_admin_msg(msg): + return + await msg.answer(f"Commands for BotFather:\n```\n{BOTFATHER_LIST}\n```", parse_mode="Markdown") diff --git a/handlers/status.py b/handlers/status.py index 35c1ddb..6823e4a 100644 --- a/handlers/status.py +++ b/handlers/status.py @@ -12,6 +12,7 @@ from services.system import format_disks from services.health import health from state import DOCKER_MAP from services.runner import run_cmd_full +from services.selftest import run_selftest async def cmd_status(msg: Message): @@ -125,34 +126,8 @@ async def selftest(msg: Message): await msg.answer("⏳ Self-test…", reply_markup=menu_kb) async def worker(): - lines = ["🧪 Self-test"] - # health - try: - htext = await asyncio.to_thread(health, cfg, DOCKER_MAP) - h_lines = [ln for ln in htext.splitlines() if ln.strip()] - brief = " | ".join(h_lines[1:5]) if len(h_lines) > 1 else h_lines[0] if h_lines else "n/a" - lines.append(f"🟢 Health: {brief}") - except Exception as e: - lines.append(f"🔴 Health failed: {e}") - - # restic snapshots check - rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40) - if rc == 0: - try: - snaps = json.loads(out) - if isinstance(snaps, list) and snaps: - snaps.sort(key=lambda s: s.get("time", ""), reverse=True) - last = snaps[0] - t = last.get("time", "?").replace("Z", "").replace("T", " ")[:16] - lines.append(f"🟢 Restic snapshots: {len(snaps)}, last {t}") - else: - lines.append("🟡 Restic snapshots: empty") - except Exception: - lines.append("🟡 Restic snapshots: invalid JSON") - else: - lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}") - - await msg.answer("\n".join(lines), reply_markup=menu_kb) + text = await run_selftest(cfg, DOCKER_MAP) + await msg.answer(text, reply_markup=menu_kb) asyncio.create_task(worker()) diff --git a/handlers/system.py b/handlers/system.py index eef37dc..484e0fa 100644 --- a/handlers/system.py +++ b/handlers/system.py @@ -19,12 +19,14 @@ from services.runner import run_cmd from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host from services.gitea import get_gitea_health from services.openwrt import get_openwrt_status +from services.system import worst_disk_usage import state from state import UPDATES_CACHE, REBOOT_PENDING from services.metrics import summarize from services.audit import read_audit_tail from services.incidents import read_recent, incidents_path from services.external_checks import format_report +from services.disk_report import build_disk_report @dp.message(F.text == "💽 Disks") @@ -308,6 +310,48 @@ async def incidents(msg: Message): await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown") +@dp.message(F.text == "/incidents_summary") +async def incidents_summary(msg: Message): + if not is_admin_msg(msg): + return + last_24h = read_recent(cfg, hours=24, limit=2000) + last_7d = read_recent(cfg, hours=24 * 7, limit=4000) + + def count(lines): + import re + total = len(lines) + cats = {} + for line in lines: + m = re.search(r"category=([A-Za-z0-9_-]+)", line) + if m: + cats[m.group(1)] = cats.get(m.group(1), 0) + 1 + top = ", ".join(f"{k}:{v}" for k, v in sorted(cats.items(), key=lambda x: x[1], reverse=True)[:5]) or "n/a" + return total, top + + t24, top24 = count(last_24h) + t7, top7 = count(last_7d) + text = ( + "📣 Incidents summary\n\n" + f"24h: {t24} (top: {top24})\n" + f"7d: {t7} (top: {top7})" + ) + await msg.answer(text, reply_markup=system_logs_audit_kb) + + +@dp.message(F.text == "/disk_snapshot") +async def disk_snapshot(msg: Message): + if not is_admin_msg(msg): + return + usage, mount = worst_disk_usage() + mount = mount or "/" + try: + report = await build_disk_report(cfg, mount, usage or 0) + except Exception as e: + await msg.answer(f"⚠️ Disk snapshot error: {e}") + return + await msg.answer(f"💽 Disk snapshot ({mount})\n\n{report}", reply_markup=system_info_kb) + + @dp.message(F.text == "🔒 SSL") async def ssl_certs(msg: Message): if not is_admin_msg(msg): diff --git a/main.py b/main.py index e50a9b1..2f1261e 100644 --- a/main.py +++ b/main.py @@ -14,6 +14,7 @@ from services.ssl_alerts import monitor_ssl from services.external_checks import monitor_external from services.incidents import log_incident from services.logging_setup import setup_logging +from services.selftest import schedule_selftest import state import handlers.menu import handlers.status @@ -73,6 +74,7 @@ async def main(): asyncio.create_task(start_sampler(state.METRICS_STORE, interval=5)) asyncio.create_task(queue_worker()) asyncio.create_task(weekly_reporter(cfg, bot, ADMIN_IDS, state.DOCKER_MAP)) + asyncio.create_task(schedule_selftest(cfg, bot, ADMIN_IDS, state.DOCKER_MAP)) loop = asyncio.get_running_loop() loop.set_exception_handler(_handle_async_exception) await notify_start() diff --git a/services/alerts.py b/services/alerts.py index 2a41b21..1cd6057 100644 --- a/services/alerts.py +++ b/services/alerts.py @@ -12,6 +12,7 @@ async def monitor_resources(cfg, notify, bot, chat_id): cooldown = int(alerts_cfg.get("cooldown_sec", 900)) notify_recovery = bool(alerts_cfg.get("notify_recovery", True)) load_only_critical = bool(alerts_cfg.get("load_only_critical", False)) + auto_mute_high_load_sec = int(alerts_cfg.get("auto_mute_on_high_load_sec", 0)) disk_warn = int(cfg.get("thresholds", {}).get("disk_warn", 80)) snapshot_warn = int(cfg.get("disk_report", {}).get("threshold", disk_warn)) @@ -72,6 +73,10 @@ async def monitor_resources(cfg, notify, bot, chat_id): key = "load_high_crit" if level == 2 else "load_high_warn" await notify(bot, chat_id, f"{icon} Load high: {load:.2f}", level=level_name, key=key, category="load") last_sent["load"] = now + if level == 2 and auto_mute_high_load_sec > 0: + from services.alert_mute import set_mute + + set_mute("load", auto_mute_high_load_sec) state["load_level"] = level await asyncio.sleep(interval) diff --git a/services/openwrt.py b/services/openwrt.py index dfbe324..395b184 100644 --- a/services/openwrt.py +++ b/services/openwrt.py @@ -240,7 +240,7 @@ def _parse_hostapd_clients( *, name_map: dict[str, str] | None = None, ifname_meta: dict[str, dict[str, str]] | None = None, -) -> list[str]: +) -> list[tuple[str, int | None, str]]: if not isinstance(payload, dict): return [] data = payload.get("clients") @@ -248,7 +248,7 @@ def _parse_hostapd_clients( items = data.items() else: return [] - clients: list[str] = [] + clients: list[tuple[str, int | None, str]] = [] name_map = name_map or {} meta = (ifname_meta or {}).get(ifname, {}) ssid = meta.get("ssid") or "" @@ -274,7 +274,8 @@ def _parse_hostapd_clients( client_label = host else: client_label = str(mac) - clients.append(f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}") + line = f"{net_label} {client_label} {sig} rx:{rx} tx:{tx}" + clients.append((line, signal if isinstance(signal, (int, float)) else None, net_label)) return clients @@ -384,6 +385,7 @@ async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str: if leases_fallback: lease_name_map.update(_extract_lease_name_map_fallback(leases_fallback)) wifi_net_counts: dict[str, int] = {} + wifi_signals: dict[str, list[int]] = {} if ifnames: for ifname in ifnames: cmd_clients = ssh_cmd + ["ubus", "call", f"hostapd.{ifname}", "get_clients"] @@ -397,14 +399,16 @@ async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str: if isinstance(clients_payload, dict): label = _net_label_for_ifname(ifname, ifname_meta) wifi_net_counts[label] = wifi_net_counts.get(label, 0) + len(clients_payload) - wifi_clients.extend( - _parse_hostapd_clients( - payload, - ifname, - name_map=lease_name_map, - ifname_meta=ifname_meta, - ) + parsed = _parse_hostapd_clients( + payload, + ifname, + name_map=lease_name_map, + ifname_meta=ifname_meta, ) + wifi_clients.extend([p[0] for p in parsed]) + for _line, sig, net_label in parsed: + if sig is not None and net_label: + wifi_signals.setdefault(net_label, []).append(sig) if leases: leases_list = _extract_leases(leases) @@ -422,7 +426,13 @@ async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str: if wifi_net_counts: wifi_section.append("📶 Wi-Fi networks:") for label, count in sorted(wifi_net_counts.items()): - wifi_section.append(f" - {label}: {count}") + sigs = wifi_signals.get(label) or [] + if sigs: + avg_sig = sum(sigs) / len(sigs) + min_sig = min(sigs) + wifi_section.append(f" - {label}: {count} (avg {avg_sig:.0f}dBm, min {min_sig}dBm)") + else: + wifi_section.append(f" - {label}: {count}") wifi_section.append("") wifi_section.append(f"📶 Wi-Fi clients: {len(wifi_clients)}") diff --git a/services/queue.py b/services/queue.py index d2bfb07..6466698 100644 --- a/services/queue.py +++ b/services/queue.py @@ -16,10 +16,12 @@ _stats: dict[str, Any] = runtime_state.get("queue_stats", {}) or { "last_label": "", "last_finished_at": 0.0, } +_history: deque[dict[str, Any]] = deque(runtime_state.get("queue_history", []) or [], maxlen=50) def _save_stats(): runtime_state.set_state("queue_stats", _stats) + runtime_state.set_state("queue_history", list(_history)) async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int: @@ -43,8 +45,11 @@ async def worker(): pass _current_label = label _current_meta = {"enqueued_at": enqueued_at, "started_at": time.time()} + status = "ok" try: await job() + except Exception: + status = "err" finally: finished_at = time.time() if _current_meta: @@ -60,6 +65,15 @@ async def worker(): ) / _stats["processed"] _stats["last_label"] = label _stats["last_finished_at"] = finished_at + _history.appendleft( + { + "label": label, + "wait_sec": int(wait_sec), + "runtime_sec": int(runtime_sec), + "finished_at": int(finished_at), + "status": status, + } + ) _save_stats() _current_label = None _current_meta = None @@ -111,4 +125,13 @@ def format_details(limit: int = 10) -> str: last_label = _stats.get("last_label") if last_label: lines.append(f"Last: {last_label}") + if _history: + lines.append("") + lines.append("🗂 Last jobs:") + for item in list(_history)[:5]: + t = time.strftime("%H:%M:%S", time.localtime(item["finished_at"])) + lines.append( + f"- {t} {item['label']} {item['status']} " + f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)" + ) return "\n".join(lines) diff --git a/services/selftest.py b/services/selftest.py new file mode 100644 index 0000000..6e775c7 --- /dev/null +++ b/services/selftest.py @@ -0,0 +1,66 @@ +import json +from datetime import datetime, timedelta +import asyncio +from typing import Any + +from services.health import health +from services.runner import run_cmd_full + + +async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> str: + lines = ["🧪 Self-test"] + + # health + try: + htext = await asyncio.to_thread(health, cfg, docker_map) + h_lines = [ln for ln in htext.splitlines() if ln.strip()] + brief = " | ".join(h_lines[1:5]) if len(h_lines) > 1 else h_lines[0] if h_lines else "n/a" + lines.append(f"🟢 Health: {brief}") + except Exception as e: + lines.append(f"🔴 Health failed: {e}") + + # restic snapshots check + rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40) + if rc == 0: + try: + snaps = json.loads(out) + if isinstance(snaps, list) and snaps: + snaps.sort(key=lambda s: s.get("time", ""), reverse=True) + last = snaps[0] + t = last.get("time", "?").replace("Z", "").replace("T", " ")[:16] + lines.append(f"🟢 Restic snapshots: {len(snaps)}, last {t}") + else: + lines.append("🟡 Restic snapshots: empty") + except Exception: + lines.append("🟡 Restic snapshots: invalid JSON") + else: + lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}") + + return "\n".join(lines) + + +async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]): + """ + Run selftest daily at configured time. + """ + sched_cfg = cfg.get("selftest", {}).get("schedule", {}) + if not sched_cfg.get("enabled", False): + return + time_str = sched_cfg.get("time", "03:30") + try: + hh, mm = [int(x) for x in time_str.split(":")] + except Exception: + hh, mm = 3, 30 + + while True: + now = datetime.now() + run_at = now.replace(hour=hh, minute=mm, second=0, microsecond=0) + if run_at <= now: + run_at += timedelta(days=1) + await asyncio.sleep((run_at - now).total_seconds()) + text = await run_selftest(cfg, docker_map) + for chat_id in admin_ids: + try: + await bot.send_message(chat_id, text) + except Exception: + pass