diff --git a/handlers/help.py b/handlers/help.py index cb97518..26ca79d 100644 --- a/handlers/help.py +++ b/handlers/help.py @@ -56,11 +56,14 @@ HELP_PAGES = [ "🛠 **Admin & Deploy**\n\n" "Config: `/config_check`, файл `config.yaml` (см. config.example.yaml).\n" "Deploy: `deploy.sh` (ssh 10.10.10.10:1090 → git pull → systemctl restart tg-bot).\n" - "Incidents summary: `/incidents_summary`.\n" - "Incidents export: `/incidents_export [hours] [csv|json]`.\n" - "Alerts log: `/alerts_log [hours]`.\n" + "Incidents: `/incidents_summary`, `/incidents_diff [hours]`.\n" + "Export: `/incidents_export [hours] [csv|json]`, `/export_all [hours]` (zip).\n" + "Alerts log/heatmap: `/alerts_log [hours]`, `/alerts_heatmap [hours] [cat]`.\n" + "Backup SLA: `/backup_sla`; Docker restarts: `/docker_restarts [hours]`.\n" "Disk snapshot: `/disk_snapshot`.\n" - "Queue history: `/queue_history`.\n" + "Queue: `/queue_history`, `/queue_sla`.\n" + "Self-test history: `/selftest_history`.\n" + "OpenWrt leases diff: `/openwrt_leases_diff`.\n" "BotFather list: `/botfather_list`.\n" "Безопасность: `safety.dry_run: true` блокирует опасные действия.\n" "OpenWrt: кнопка в System → Info.", @@ -128,9 +131,11 @@ alerts_list - List active mutes alerts_recent - Show recent incidents (24h) alerts_mute_load - Mute load alerts for 60m alerts_log - Show suppressed alerts +alerts_heatmap - Hourly incidents heatmap backup_run - Run backup (queued) backup_history - Show backup log tail queue_history - Show queue recent jobs +queue_sla - Queue SLA stats docker_status - Docker summary docker_health - Docker inspect/health by alias docker_health_summary - Docker health summary (problems only) @@ -139,8 +144,14 @@ openwrt_wan - OpenWrt WAN only openwrt_clients - OpenWrt wifi clients openwrt_leases - OpenWrt DHCP leases openwrt_fast - OpenWrt quick WAN view +openwrt_leases_diff - OpenWrt DHCP diff incidents_summary - Incidents counters (24h/7d) incidents_export - Export incidents (hours fmt) +incidents_diff - Show incidents since last check +export_all - Zip with incidents/queue/selftest +backup_sla - Backup SLA check +docker_restarts - Docker restart history +selftest_history - Self-test history disk_snapshot - Disk usage snapshot config_check - Validate config """ diff --git a/handlers/status.py b/handlers/status.py index 6823e4a..3ae6fb1 100644 --- a/handlers/status.py +++ b/handlers/status.py @@ -126,7 +126,7 @@ async def selftest(msg: Message): await msg.answer("⏳ Self-test…", reply_markup=menu_kb) async def worker(): - text = await run_selftest(cfg, DOCKER_MAP) + text, _ok = await run_selftest(cfg, DOCKER_MAP) await msg.answer(text, reply_markup=menu_kb) asyncio.create_task(worker()) diff --git a/handlers/system.py b/handlers/system.py index f5f98d7..7cd0942 100644 --- a/handlers/system.py +++ b/handlers/system.py @@ -1,5 +1,6 @@ import asyncio import os +from datetime import datetime, timezone, timedelta from aiogram import F from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile from app import dp, cfg @@ -13,12 +14,12 @@ from keyboards import ( ) from system_checks import security, disks, hardware, list_disks, smart_last_test from services.http_checks import get_url_checks, check_url -from services.queue import enqueue +from services.queue import enqueue, get_history_raw, get_stats from services.updates import list_updates, apply_updates -from services.runner import run_cmd +from services.runner import run_cmd, run_cmd_full from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host from services.gitea import get_gitea_health -from services.openwrt import get_openwrt_status +from services.openwrt import get_openwrt_status, fetch_openwrt_leases from services.system import worst_disk_usage import state from state import UPDATES_CACHE, REBOOT_PENDING @@ -27,9 +28,11 @@ from services.audit import read_audit_tail from services.incidents import read_recent, incidents_path, read_raw, infer_category from services.external_checks import format_report from services.disk_report import build_disk_report +from services import runtime_state import io import json import csv +import zipfile @dp.message(F.text == "💽 Disks") @@ -367,6 +370,86 @@ async def incidents_summary(msg: Message): await msg.answer(text, reply_markup=system_logs_audit_kb) +@dp.message(F.text.startswith("/incidents_diff")) +async def incidents_diff(msg: Message): + if not is_admin_msg(msg): + return + parts = msg.text.split() + hours = 24 + if len(parts) >= 2: + try: + hours = max(1, int(parts[1])) + except ValueError: + hours = 24 + last_iso = runtime_state.get("incidents_diff_last_ts") + last_dt = None + if isinstance(last_iso, str): + try: + last_dt = datetime.fromisoformat(last_iso) + except Exception: + last_dt = None + rows = read_raw(cfg, hours=hours, limit=5000, include_old=True) + fresh: list[tuple[datetime, str]] = [] + for dt, line in rows: + if last_dt and dt <= last_dt: + continue + fresh.append((dt, line)) + if not fresh: + note = f"since {last_dt.astimezone().strftime('%Y-%m-%d %H:%M')}" if last_dt else "for the period" + await msg.answer(f"📣 No new incidents {note} (window {hours}h)", reply_markup=system_logs_audit_kb) + return + fresh.sort(key=lambda x: x[0]) + body = "\n".join(f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" for dt, line in fresh[-200:]) + await msg.answer( + f"📣 New incidents (since last mark, window {hours}h): {len(fresh)}\n```\n{body}\n```", + reply_markup=system_logs_audit_kb, + parse_mode="Markdown", + ) + try: + runtime_state.set_state("incidents_diff_last_ts", fresh[-1][0].isoformat()) + except Exception: + pass + + +@dp.message(F.text.startswith("/alerts_heatmap")) +async def alerts_heatmap(msg: Message): + if not is_admin_msg(msg): + return + parts = msg.text.split() + hours = 48 + category = None + for part in parts[1:]: + if part.isdigit(): + hours = max(1, int(part)) + else: + category = part.lower() + rows = read_raw(cfg, hours=hours, limit=8000, include_old=True) + buckets: dict[datetime, int] = {} + for dt, line in rows: + cat = infer_category(line) or "n/a" + if category and cat != category: + continue + local = dt.astimezone() + hour = local.replace(minute=0, second=0, microsecond=0) + buckets[hour] = buckets.get(hour, 0) + 1 + if not buckets: + await msg.answer(f"⚠️ No incidents for heatmap (hours={hours}, category={category or 'any'})") + return + start = datetime.now().astimezone() - timedelta(hours=hours - 1) + timeline = [] + current = start.replace(minute=0, second=0, microsecond=0) + while current <= datetime.now().astimezone(): + timeline.append((current, buckets.get(current, 0))) + current += timedelta(hours=1) + max_count = max(c for _, c in timeline) or 1 + lines = [f"📊 Alerts heatmap (hours={hours}, category={category or 'any'})"] + for ts, cnt in timeline[-72:]: + bar_len = max(1, int(cnt / max_count * 12)) if cnt else 0 + bar = "▇" * bar_len if cnt else "" + lines.append(f"{ts:%m-%d %H}: {cnt:>3} {bar}") + await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb) + + @dp.message(F.text == "/disk_snapshot") async def disk_snapshot(msg: Message): if not is_admin_msg(msg): @@ -443,6 +526,229 @@ async def incidents_export(msg: Message): await msg.answer_document(document=BufferedInputFile(file_bytes, filename=fname)) +@dp.message(F.text == "/backup_sla") +async def backup_sla(msg: Message): + if not is_admin_msg(msg): + return + rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40) + if rc != 0: + await msg.answer(f"⚠️ Restic error: {out.strip() or rc}", reply_markup=system_logs_audit_kb) + return + try: + snaps = json.loads(out) + except Exception as e: + await msg.answer(f"⚠️ Invalid restic JSON: {e}", reply_markup=system_logs_audit_kb) + return + if not isinstance(snaps, list) or not snaps: + await msg.answer("⚠️ No snapshots found", reply_markup=system_logs_audit_kb) + return + snaps.sort(key=lambda s: s.get("time", ""), reverse=True) + last_time_raw = snaps[0].get("time") + if last_time_raw: + last_dt = datetime.fromisoformat(last_time_raw.replace("Z", "+00:00")) + age_h = (datetime.now(tz=last_dt.tzinfo) - last_dt).total_seconds() / 3600 + else: + last_dt = None + age_h = None + sla = float(cfg.get("backup", {}).get("sla_hours", 26)) + if age_h is None: + status = "🟡" + elif age_h <= sla: + status = "🟢" + elif age_h <= sla * 1.5: + status = "🟡" + else: + status = "🔴" + last_str = last_dt.astimezone().strftime("%Y-%m-%d %H:%M") if last_dt else "n/a" + age_str = f"{age_h:.1f}h" if age_h is not None else "n/a" + await msg.answer( + f"{status} Backup SLA\n" + f"Snapshots: {len(snaps)}\n" + f"Last: {last_str} (age {age_str})\n" + f"SLA: {sla}h", + reply_markup=system_logs_audit_kb, + ) + + +@dp.message(F.text.startswith("/docker_restarts")) +async def docker_restarts(msg: Message): + if not is_admin_msg(msg): + return + parts = msg.text.split() + hours = 168 + if len(parts) >= 2: + try: + hours = max(1, int(parts[1])) + except ValueError: + hours = 168 + rows = read_raw(cfg, hours=hours, limit=5000, include_old=True) + restarts = [(dt, line) for dt, line in rows if "docker_restart" in line] + if not restarts: + await msg.answer(f"🐳 No docker restarts in last {hours}h", reply_markup=system_logs_audit_kb) + return + counts: dict[str, int] = {} + for _dt, line in restarts: + parts_line = line.split() + if len(parts_line) >= 2: + alias = parts_line[-1] + else: + alias = "unknown" + counts[alias] = counts.get(alias, 0) + 1 + top = ", ".join(f"{k}:{v}" for k, v in sorted(counts.items(), key=lambda x: x[1], reverse=True)) + body = "\n".join(f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" for dt, line in restarts[-50:]) + await msg.answer( + f"🐳 Docker restarts ({hours}h): {len(restarts)} ({top})\n```\n{body}\n```", + reply_markup=system_logs_audit_kb, + parse_mode="Markdown", + ) + + +@dp.message(F.text.startswith("/openwrt_leases_diff")) +async def openwrt_leases_diff(msg: Message): + if not is_admin_msg(msg): + return + await msg.answer("⏳ OpenWrt leases diff…", reply_markup=system_info_kb) + + async def worker(): + try: + leases_now = await fetch_openwrt_leases(cfg) + except Exception as e: + await msg.answer(f"⚠️ OpenWrt error: {e}", reply_markup=system_info_kb) + return + prev = runtime_state.get("openwrt_leases_prev", []) + if not prev: + runtime_state.set_state("openwrt_leases_prev", leases_now) + await msg.answer(f"Baseline saved ({len(leases_now)} leases)", reply_markup=system_info_kb) + return + prev_set = set(prev) + now_set = set(leases_now) + added = sorted(list(now_set - prev_set)) + removed = sorted(list(prev_set - now_set)) + lines = [f"🧾 OpenWrt leases diff ({len(added)} added, {len(removed)} removed)"] + if added: + lines.append("➕ Added:") + lines.extend([f" - {x}" for x in added[:50]]) + if removed: + lines.append("➖ Removed:") + lines.extend([f" - {x}" for x in removed[:50]]) + if len(added) > 50 or len(removed) > 50: + lines.append("… trimmed") + runtime_state.set_state("openwrt_leases_prev", leases_now) + await msg.answer("\n".join(lines), reply_markup=system_info_kb) + + asyncio.create_task(worker()) + + +@dp.message(F.text == "/queue_sla") +async def queue_sla(msg: Message): + if not is_admin_msg(msg): + return + hist = get_history_raw() + if not hist: + await msg.answer("🧾 Queue SLA: history is empty", reply_markup=system_logs_audit_kb) + return + waits = [item.get("wait_sec", 0) for item in hist] + runs = [item.get("runtime_sec", 0) for item in hist] + + def p95(values: list[int]) -> float: + if not values: + return 0.0 + vals = sorted(values) + k = int(0.95 * (len(vals) - 1)) + return float(vals[k]) + + lines = [ + "🧾 Queue SLA", + f"Samples: {len(hist)}", + f"Max wait: {max(waits)}s, p95 {p95(waits):.1f}s", + f"Max run: {max(runs)}s, p95 {p95(runs):.1f}s", + ] + stats = get_stats() + if stats: + lines.append( + f"Avg wait: {stats.get('avg_wait_sec', 0):.1f}s, " + f"avg run: {stats.get('avg_runtime_sec', 0):.1f}s" + ) + await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb) + + +@dp.message(F.text == "/selftest_history") +async def selftest_history(msg: Message): + if not is_admin_msg(msg): + return + hist = runtime_state.get("selftest_history", []) + if not hist: + await msg.answer("🧪 Self-test history is empty", reply_markup=system_logs_audit_kb) + return + lines = ["🧪 Self-test history"] + for entry in hist[:15]: + ts = entry.get("ts", "")[:16] + ok = entry.get("ok", False) + emoji = "🟢" if ok else "🔴" + summary = entry.get("summary", "") + lines.append(f"{emoji} {ts} {summary}") + await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb) + + +@dp.message(F.text.startswith("/export_all")) +async def export_all(msg: Message): + if not is_admin_msg(msg): + return + hours = 168 + parts = msg.text.split() + if len(parts) >= 2: + try: + hours = max(1, int(parts[1])) + except ValueError: + hours = 168 + + incidents_rows = read_raw(cfg, hours=hours, limit=20000, include_old=True) + inc_sio = io.StringIO() + inc_writer = csv.writer(inc_sio, delimiter=";") + inc_writer.writerow(["timestamp", "category", "message"]) + for dt, line in incidents_rows: + inc_writer.writerow([dt.astimezone().isoformat(), infer_category(line) or "n/a", line]) + + queue_rows = get_history_raw() + q_sio = io.StringIO() + q_writer = csv.writer(q_sio, delimiter=";") + q_writer.writerow(["finished_at", "label", "status", "wait_sec", "runtime_sec"]) + for item in queue_rows: + q_writer.writerow( + [ + datetime.fromtimestamp(item.get("finished_at", 0)).isoformat() + if item.get("finished_at") + else "", + item.get("label", ""), + item.get("status", ""), + item.get("wait_sec", ""), + item.get("runtime_sec", ""), + ] + ) + + st_hist = runtime_state.get("selftest_history", []) + st_sio = io.StringIO() + st_writer = csv.writer(st_sio, delimiter=";") + st_writer.writerow(["timestamp", "ok", "summary"]) + for entry in st_hist: + st_writer.writerow([entry.get("ts", ""), entry.get("ok", False), entry.get("summary", "")]) + + mem_zip = io.BytesIO() + with zipfile.ZipFile(mem_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf: + zf.writestr(f"incidents_{hours}h.csv", inc_sio.getvalue()) + zf.writestr("queue_history.csv", q_sio.getvalue()) + zf.writestr("selftest_history.csv", st_sio.getvalue()) + mem_zip.seek(0) + summary = ( + f"📦 Export all\n" + f"Incidents: {len(incidents_rows)} rows ({hours}h)\n" + f"Queue history: {len(queue_rows)} rows\n" + f"Selftest: {len(st_hist)} rows" + ) + await msg.answer(summary) + await msg.answer_document(document=BufferedInputFile(mem_zip.read(), filename="export_all.zip")) + + @dp.message(F.text == "🔒 SSL") async def ssl_certs(msg: Message): if not is_admin_msg(msg): diff --git a/services/incidents.py b/services/incidents.py index fe7d660..f5ef031 100644 --- a/services/incidents.py +++ b/services/incidents.py @@ -4,6 +4,7 @@ from collections import deque from datetime import datetime, timedelta, timezone from logging.handlers import TimedRotatingFileHandler from typing import Any +from services import runtime_state def _get_path(cfg: dict[str, Any]) -> str: diff --git a/services/openwrt.py b/services/openwrt.py index 395b184..b53915e 100644 --- a/services/openwrt.py +++ b/services/openwrt.py @@ -460,3 +460,45 @@ async def get_openwrt_status(cfg: dict[str, Any], mode: str = "full") -> str: if mode == "leases": return "\n".join(header + lease_section) return "\n".join(header + wifi_section + lease_section) + + +async def fetch_openwrt_leases(cfg: dict[str, Any]) -> list[str]: + """ + Fetch DHCP leases as list of strings "IP hostname (MAC)". + """ + ow_cfg = cfg.get("openwrt", {}) + host = ow_cfg.get("host") + user = ow_cfg.get("user", "root") + port = ow_cfg.get("port", 22) + identity_file = ow_cfg.get("identity_file") + timeout_sec = ow_cfg.get("timeout_sec", 8) + strict = ow_cfg.get("strict_host_key_checking", True) + + if not host: + raise RuntimeError("OpenWrt host not configured") + + ssh_cmd = [ + "ssh", + "-o", + "BatchMode=yes", + "-o", + f"ConnectTimeout={timeout_sec}", + "-o", + "LogLevel=ERROR", + ] + if not strict: + ssh_cmd += ["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"] + if identity_file: + ssh_cmd += ["-i", str(identity_file)] + ssh_cmd += ["-p", str(port), f"{user}@{host}"] + + remote = "ubus call luci-rpc getDHCPLeases '{\"family\":4}' 2>/dev/null || cat /tmp/dhcp.leases" + rc, out = await run_cmd_full(ssh_cmd + ["sh", "-c", remote], timeout=timeout_sec + 10) + if rc == 124: + raise RuntimeError("timeout") + if rc != 0: + raise RuntimeError(out.strip() or f"ssh rc={rc}") + leases = _safe_json_load(out) + if leases: + return _extract_leases(leases) + return _parse_leases_fallback(out) diff --git a/services/queue.py b/services/queue.py index 2b8e1a4..995d735 100644 --- a/services/queue.py +++ b/services/queue.py @@ -187,3 +187,11 @@ def format_history(limit: int = 20) -> str: f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)" ) return "\n".join(lines) + + +def get_history_raw() -> list[dict[str, Any]]: + return list(_history) + + +def get_stats() -> dict[str, Any]: + return dict(_stats) diff --git a/services/selftest.py b/services/selftest.py index 323c424..3a31dfc 100644 --- a/services/selftest.py +++ b/services/selftest.py @@ -6,6 +6,14 @@ from typing import Any from services.health import health from services.runner import run_cmd_full from services.incidents import log_incident +from services import runtime_state + + +def _save_history(entry: dict[str, Any]) -> None: + hist = runtime_state.get("selftest_history", []) + hist = hist[:50] if isinstance(hist, list) else [] + hist.insert(0, entry) + runtime_state.set_state("selftest_history", hist[:20]) async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> tuple[str, bool]: @@ -40,7 +48,19 @@ async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> tuple lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}") ok = False - return "\n".join(lines), ok + result_text = "\n".join(lines) + try: + _save_history( + { + "ts": datetime.now().isoformat(), + "ok": ok, + "summary": result_text.splitlines()[1] if len(lines) > 1 else "", + } + ) + except Exception: + pass + + return result_text, ok async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]):