diff --git a/CONFIG.en.md b/CONFIG.en.md index e3314ad..e859f92 100644 --- a/CONFIG.en.md +++ b/CONFIG.en.md @@ -88,6 +88,11 @@ This project uses `config.yaml`. Start from `config.example.yaml`. - `schedule.enabled` (bool): Enable auto self-test. - `schedule.time` (string): Local time `HH:MM` (default `03:30`). +## queue + +- `max_pending_alert` (int): Alert if pending tasks >= this value. +- `avg_wait_alert` (int): Alert if average wait exceeds N seconds. +- `cooldown_sec` (int): Cooldown between queue alerts (default 300s). ## external_checks - `enabled` (bool): Enable background checks. diff --git a/CONFIG.md b/CONFIG.md index e4ad9f6..f16f7fc 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -88,6 +88,12 @@ - `schedule.enabled` (bool): включить авто self-test. - `schedule.time` (string): локальное время `HH:MM`, по умолчанию `03:30`. +## queue + +- `max_pending_alert` (int): алерт, если задач в очереди >= этому значению. +- `avg_wait_alert` (int): алерт, если среднее ожидание превышает N секунд. +- `cooldown_sec` (int): кулдаун между алертами очереди, по умолчанию 300с. + ## external_checks - `enabled` (bool): включить фоновые проверки. diff --git a/config.example.yaml b/config.example.yaml index a35f81f..81f3607 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -85,6 +85,11 @@ selftest: enabled: false time: "03:30" +queue: + max_pending_alert: 5 + avg_wait_alert: 120 + cooldown_sec: 300 + external_checks: enabled: true state_path: "/var/server-bot/external_checks.json" diff --git a/handlers/backup.py b/handlers/backup.py index fcfbad1..6a9b3c0 100644 --- a/handlers/backup.py +++ b/handlers/backup.py @@ -8,9 +8,10 @@ from app import dp, cfg from auth import is_admin_msg, is_admin_cb from keyboards import backup_kb from lock_utils import acquire_lock, release_lock -from services.queue import enqueue, format_status, format_details +from services.queue import enqueue, format_status, format_details, format_history from services.backup import backup_badge, restore_help from services.runner import run_cmd, run_cmd_full +from services.incidents import log_incident def _parse_systemctl_kv(raw: str) -> dict[str, str]: @@ -551,6 +552,13 @@ async def backup_history(msg: Message): ) +@dp.message(F.text == "/queue_history") +async def queue_history(msg: Message): + if not is_admin_msg(msg): + return + await msg.answer(format_history(), reply_markup=backup_kb) + + @dp.callback_query(F.data == "backup:retry") async def backup_retry(cb: CallbackQuery): if not is_admin_cb(cb): diff --git a/handlers/docker.py b/handlers/docker.py index 3d29c41..f950dc2 100644 --- a/handlers/docker.py +++ b/handlers/docker.py @@ -124,6 +124,37 @@ async def docker_health(msg: Message): log_incident(cfg, f"docker_health alias={alias} by {msg.from_user.id}", category="docker") +@dp.message(F.text == "/docker_health_summary") +async def docker_health_summary(msg: Message): + if not is_admin_msg(msg): + return + if not DOCKER_MAP: + await msg.answer("⚠️ DOCKER_MAP пуст", reply_markup=docker_kb) + return + problems = [] + total = len(DOCKER_MAP) + for alias, real in DOCKER_MAP.items(): + rc, out = await docker_cmd(["inspect", "-f", "{{json .State}}", real], timeout=10) + if rc != 0: + problems.append(f"{alias}: inspect error") + continue + try: + state = json.loads(out) + except Exception: + problems.append(f"{alias}: bad JSON") + continue + status = state.get("Status", "n/a") + health = (state.get("Health") or {}).get("Status", "n/a") + if status != "running" or health not in ("healthy", "none"): + problems.append(f"{alias}: {status}/{health}") + ok = total - len(problems) + lines = [f"🐳 Docker health: 🟢 {ok}/{total} healthy, 🔴 {len(problems)} issues"] + if problems: + lines.append("Problems:") + lines.extend([f"- {p}" for p in problems]) + await msg.answer("\n".join(lines), reply_markup=docker_kb) + + @dp.message(F.text == "📈 Stats") async def dstats(msg: Message): if not is_admin_msg(msg): diff --git a/handlers/help.py b/handlers/help.py index 5db9ba8..cb97518 100644 --- a/handlers/help.py +++ b/handlers/help.py @@ -57,7 +57,10 @@ HELP_PAGES = [ "Config: `/config_check`, файл `config.yaml` (см. config.example.yaml).\n" "Deploy: `deploy.sh` (ssh 10.10.10.10:1090 → git pull → systemctl restart tg-bot).\n" "Incidents summary: `/incidents_summary`.\n" + "Incidents export: `/incidents_export [hours] [csv|json]`.\n" + "Alerts log: `/alerts_log [hours]`.\n" "Disk snapshot: `/disk_snapshot`.\n" + "Queue history: `/queue_history`.\n" "BotFather list: `/botfather_list`.\n" "Безопасность: `safety.dry_run: true` блокирует опасные действия.\n" "OpenWrt: кнопка в System → Info.", @@ -124,15 +127,20 @@ alerts - Manage alerts alerts_list - List active mutes alerts_recent - Show recent incidents (24h) alerts_mute_load - Mute load alerts for 60m +alerts_log - Show suppressed alerts backup_run - Run backup (queued) backup_history - Show backup log tail +queue_history - Show queue recent jobs docker_status - Docker summary docker_health - Docker inspect/health by alias +docker_health_summary - Docker health summary (problems only) openwrt - Full OpenWrt status openwrt_wan - OpenWrt WAN only openwrt_clients - OpenWrt wifi clients openwrt_leases - OpenWrt DHCP leases +openwrt_fast - OpenWrt quick WAN view incidents_summary - Incidents counters (24h/7d) +incidents_export - Export incidents (hours fmt) disk_snapshot - Disk usage snapshot config_check - Validate config """ diff --git a/handlers/system.py b/handlers/system.py index 484e0fa..11a0ae6 100644 --- a/handlers/system.py +++ b/handlers/system.py @@ -24,9 +24,12 @@ import state from state import UPDATES_CACHE, REBOOT_PENDING from services.metrics import summarize from services.audit import read_audit_tail -from services.incidents import read_recent, incidents_path +from services.incidents import read_recent, incidents_path, read_raw, infer_category from services.external_checks import format_report from services.disk_report import build_disk_report +import io +import json +import csv @dp.message(F.text == "💽 Disks") @@ -269,6 +272,22 @@ async def openwrt_leases(msg: Message): asyncio.create_task(worker()) +@dp.message(F.text == "/openwrt_fast") +async def openwrt_fast(msg: Message): + if not is_admin_msg(msg): + return + await msg.answer("⏳ OpenWrt fast…", reply_markup=system_info_kb) + + async def worker(): + try: + text = await get_openwrt_status(cfg, mode="wan") + except Exception as e: + text = f"⚠️ OpenWrt error: {e}" + await msg.answer(text, reply_markup=system_info_kb) + + asyncio.create_task(worker()) + + @dp.message(F.text == "🧾 Audit") async def audit_log(msg: Message): if not is_admin_msg(msg): @@ -314,26 +333,36 @@ async def incidents(msg: Message): async def incidents_summary(msg: Message): if not is_admin_msg(msg): return - last_24h = read_recent(cfg, hours=24, limit=2000) - last_7d = read_recent(cfg, hours=24 * 7, limit=4000) + last_24h = read_raw(cfg, hours=24, limit=2000) + last_7d = read_raw(cfg, hours=24 * 7, limit=4000) - def count(lines): - import re - total = len(lines) + def summarize(items): + total = len(items) cats = {} - for line in lines: - m = re.search(r"category=([A-Za-z0-9_-]+)", line) - if m: - cats[m.group(1)] = cats.get(m.group(1), 0) + 1 - top = ", ".join(f"{k}:{v}" for k, v in sorted(cats.items(), key=lambda x: x[1], reverse=True)[:5]) or "n/a" - return total, top + suppressed = {} + last_seen = {} + for dt, msg in items: + cat = infer_category(msg) or "n/a" + cats[cat] = cats.get(cat, 0) + 1 + last_seen[cat] = dt + if "[suppressed" in msg.lower(): + suppressed[cat] = suppressed.get(cat, 0) + 1 + def fmt_top(d): + return ", ".join(f"{k}:{v}" for k, v in sorted(d.items(), key=lambda x: x[1], reverse=True)[:5]) or "n/a" + top = fmt_top(cats) + top_supp = fmt_top(suppressed) + last_parts = [] + for k, dt in sorted(last_seen.items(), key=lambda x: x[1], reverse=True)[:5]: + last_parts.append(f"{k}:{dt.astimezone().strftime('%Y-%m-%d %H:%M')}") + last_str = ", ".join(last_parts) or "n/a" + return total, top, top_supp, last_str - t24, top24 = count(last_24h) - t7, top7 = count(last_7d) + t24, top24, supp24, last24 = summarize(last_24h) + t7, top7, supp7, last7 = summarize(last_7d) text = ( "📣 Incidents summary\n\n" - f"24h: {t24} (top: {top24})\n" - f"7d: {t7} (top: {top7})" + f"24h: {t24} (top: {top24}; suppressed: {supp24}; last: {last24})\n" + f"7d: {t7} (top: {top7}; suppressed: {supp7}; last: {last7})" ) await msg.answer(text, reply_markup=system_logs_audit_kb) @@ -352,6 +381,68 @@ async def disk_snapshot(msg: Message): await msg.answer(f"💽 Disk snapshot ({mount})\n\n{report}", reply_markup=system_info_kb) +@dp.message(F.text.startswith("/alerts_log")) +async def alerts_log(msg: Message): + if not is_admin_msg(msg): + return + parts = msg.text.split() + hours = 24 + if len(parts) >= 2: + try: + hours = max(1, int(parts[1])) + except ValueError: + hours = 24 + rows = read_raw(cfg, hours=hours, limit=2000) + suppressed = [(dt, m) for dt, m in rows if "[suppressed" in m.lower()] + sent = [(dt, m) for dt, m in rows if "[suppressed" not in m.lower()] + lines = [f"📣 Alerts log ({hours}h)"] + lines.append(f"Sent: {len(sent)}, Suppressed: {len(suppressed)}") + if suppressed: + lines.append("\nSuppressed:") + for dt, m in suppressed[-20:]: + lines.append(f"{dt:%m-%d %H:%M} {m}") + await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb) + + +@dp.message(F.text.startswith("/incidents_export")) +async def incidents_export(msg: Message): + if not is_admin_msg(msg): + return + parts = msg.text.split() + hours = 24 + fmt = "csv" + if len(parts) >= 2: + try: + hours = max(1, int(parts[1])) + except ValueError: + hours = 24 + if len(parts) >= 3: + fmt = parts[2].lower() + rows = read_raw(cfg, hours=hours, limit=20000, include_old=False) + data = [] + for dt, msg_line in rows: + data.append({ + "timestamp": dt.astimezone().isoformat(), + "category": infer_category(msg_line) or "n/a", + "message": msg_line, + }) + if fmt == "json": + payload = json.dumps(data, ensure_ascii=False, indent=2) + bio = io.BytesIO(payload.encode("utf-8")) + bio.name = f"incidents_{hours}h.json" + else: + sio = io.StringIO() + writer = csv.DictWriter(sio, fieldnames=["timestamp", "category", "message"]) + writer.writeheader() + for row in data: + writer.writerow(row) + bio = io.BytesIO(sio.getvalue().encode("utf-8")) + bio.name = f"incidents_{hours}h.csv" + summary = f"📤 Incidents export ({hours}h): {len(data)} rows, format {fmt}" + await msg.answer(summary) + await msg.answer_document(document=bio) + + @dp.message(F.text == "🔒 SSL") async def ssl_certs(msg: Message): if not is_admin_msg(msg): diff --git a/main.py b/main.py index b2802cd..d1974ed 100644 --- a/main.py +++ b/main.py @@ -7,7 +7,7 @@ from keyboards import menu_kb from services.docker import discover_containers, docker_watchdog from services.alerts import monitor_resources, monitor_smart from services.metrics import MetricsStore, start_sampler -from services.queue import worker as queue_worker +from services.queue import worker as queue_worker, configure as queue_configure from services.notify import notify from services.audit import AuditMiddleware, audit_start from services.ssl_alerts import monitor_ssl @@ -38,6 +38,22 @@ def _handle_async_exception(_loop, context): text = f"❌ {msg}: {type(exc).__name__}: {exc}" else: text = f"❌ {msg}" + now = datetime.now() + if not hasattr(_handle_async_exception, "_recent"): + _handle_async_exception._recent = [] + _handle_async_exception._last_alert = None + recent = _handle_async_exception._recent + recent.append(now) + # keep last hour + _handle_async_exception._recent = [t for t in recent if (now - t).total_seconds() < 3600] + if len(_handle_async_exception._recent) >= 3: + last_alert = getattr(_handle_async_exception, "_last_alert", None) + if not last_alert or (now - last_alert).total_seconds() > 3600: + try: + log_incident(cfg, "exception_flood", category="system") + except Exception: + pass + _handle_async_exception._last_alert = now try: log_incident(cfg, text, category="system") except Exception: @@ -72,6 +88,7 @@ async def main(): asyncio.create_task(monitor_external(cfg)) state.METRICS_STORE = MetricsStore() asyncio.create_task(start_sampler(state.METRICS_STORE, interval=5)) + queue_configure(cfg.get("queue", {}), cfg) asyncio.create_task(queue_worker()) asyncio.create_task(weekly_reporter(cfg, bot, ADMIN_IDS, state.DOCKER_MAP)) asyncio.create_task(schedule_selftest(cfg, bot, ADMIN_IDS, state.DOCKER_MAP)) diff --git a/services/incidents.py b/services/incidents.py index aa0b502..fe7d660 100644 --- a/services/incidents.py +++ b/services/incidents.py @@ -65,6 +65,10 @@ def _parse_line(line: str) -> tuple[datetime | None, str]: def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]: + return [f"{dt:%Y-%m-%d %H:%M} {msg}" for dt, msg in read_raw(cfg, hours, limit=limit)] + + +def read_raw(cfg: dict[str, Any], hours: int, limit: int = 200, *, include_old: bool = False) -> list[tuple[datetime, str]]: path = _get_path(cfg) if not os.path.exists(path): return [] @@ -74,7 +78,40 @@ def read_recent(cfg: dict[str, Any], hours: int, limit: int = 200) -> list[str]: with open(path, "r", encoding="utf-8", errors="replace") as f: for line in f: dt, msg = _parse_line(line.rstrip()) - if dt is None or dt < since: + if dt is None: continue - lines.append(f"{dt:%Y-%m-%d %H:%M} {msg}") + if not include_old and dt < since: + continue + lines.append((dt, msg)) return list(lines) + + +def infer_category(text: str) -> str | None: + lower = text.lower() + if "category=" in lower: + import re + + m = re.search(r"category=([a-z0-9_-]+)", lower) + if m: + return m.group(1) + if "load" in lower: + return "load" + if "docker" in lower: + return "docker" + if "restic" in lower or "backup" in lower: + return "backup" + if "smart" in lower: + return "smart" + if "ssl" in lower or "cert" in lower: + return "ssl" + if "npmplus" in lower: + return "npmplus" + if "gitea" in lower: + return "gitea" + if "openwrt" in lower: + return "openwrt" + if "queue" in lower: + return "queue" + if "selftest" in lower: + return "selftest" + return None diff --git a/services/queue.py b/services/queue.py index 6466698..2b8e1a4 100644 --- a/services/queue.py +++ b/services/queue.py @@ -3,6 +3,7 @@ import time from collections import deque from typing import Awaitable, Callable, Any from services import runtime_state +from services.incidents import log_incident _queue: asyncio.Queue = asyncio.Queue() @@ -17,6 +18,13 @@ _stats: dict[str, Any] = runtime_state.get("queue_stats", {}) or { "last_finished_at": 0.0, } _history: deque[dict[str, Any]] = deque(runtime_state.get("queue_history", []) or [], maxlen=50) +_alert_cfg: dict[str, Any] = { + "max_pending": None, + "avg_wait": None, + "cooldown": 300, + "last_sent": 0.0, +} +_cfg: dict[str, Any] | None = None def _save_stats(): @@ -24,10 +32,39 @@ def _save_stats(): runtime_state.set_state("queue_history", list(_history)) +def configure(queue_cfg: dict[str, Any], cfg: dict[str, Any]): + global _cfg + _cfg = cfg + _alert_cfg["max_pending"] = queue_cfg.get("max_pending_alert") + _alert_cfg["avg_wait"] = queue_cfg.get("avg_wait_alert") + _alert_cfg["cooldown"] = queue_cfg.get("cooldown_sec", 300) + + +def _check_congestion(pending_count: int, avg_wait: float | None): + max_pending = _alert_cfg.get("max_pending") + avg_wait_thr = _alert_cfg.get("avg_wait") + cooldown = _alert_cfg.get("cooldown", 300) + now = time.time() + if now - _alert_cfg.get("last_sent", 0) < cooldown: + return + reason = None + if max_pending and pending_count >= max_pending: + reason = f"pending={pending_count} >= {max_pending}" + if avg_wait_thr and avg_wait is not None and avg_wait >= avg_wait_thr: + reason = reason or f"avg_wait={avg_wait:.1f}s >= {avg_wait_thr}s" + if reason and _cfg: + try: + log_incident(_cfg, f"queue_congested {reason}", category="queue") + except Exception: + pass + _alert_cfg["last_sent"] = now + + async def enqueue(label: str, job: Callable[[], Awaitable[None]]) -> int: enqueued_at = time.time() await _queue.put((label, job, enqueued_at)) _pending.append((label, enqueued_at)) + _check_congestion(len(_pending), None) return len(_pending) @@ -75,6 +112,7 @@ async def worker(): } ) _save_stats() + _check_congestion(len(_pending), _stats.get("avg_wait_sec")) _current_label = None _current_meta = None _queue.task_done() @@ -135,3 +173,17 @@ def format_details(limit: int = 10) -> str: f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)" ) return "\n".join(lines) + + +def format_history(limit: int = 20) -> str: + lines = ["🗂 Queue history"] + if not _history: + lines.append("(empty)") + return "\n".join(lines) + for item in list(_history)[:limit]: + t = time.strftime("%m-%d %H:%M:%S", time.localtime(item["finished_at"])) + lines.append( + f"{t} {item['label']} {item['status']} " + f"(wait {item['wait_sec']}s, run {item['runtime_sec']}s)" + ) + return "\n".join(lines) diff --git a/services/selftest.py b/services/selftest.py index 6e775c7..323c424 100644 --- a/services/selftest.py +++ b/services/selftest.py @@ -5,10 +5,12 @@ from typing import Any from services.health import health from services.runner import run_cmd_full +from services.incidents import log_incident -async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> str: +async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> tuple[str, bool]: lines = ["🧪 Self-test"] + ok = True # health try: @@ -18,6 +20,7 @@ async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> str: lines.append(f"🟢 Health: {brief}") except Exception as e: lines.append(f"🔴 Health failed: {e}") + ok = False # restic snapshots check rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40) @@ -35,8 +38,9 @@ async def run_selftest(cfg: dict[str, Any], docker_map: dict[str, str]) -> str: lines.append("🟡 Restic snapshots: invalid JSON") else: lines.append(f"🔴 Restic snapshots error: {out.strip() or rc}") + ok = False - return "\n".join(lines) + return "\n".join(lines), ok async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], docker_map: dict[str, str]): @@ -58,9 +62,14 @@ async def schedule_selftest(cfg: dict[str, Any], bot, admin_ids: list[int], dock if run_at <= now: run_at += timedelta(days=1) await asyncio.sleep((run_at - now).total_seconds()) - text = await run_selftest(cfg, docker_map) + text, ok = await run_selftest(cfg, docker_map) for chat_id in admin_ids: try: await bot.send_message(chat_id, text) except Exception: pass + if not ok: + try: + log_incident(cfg, "selftest failed", category="selftest") + except Exception: + pass