import asyncio import os from datetime import datetime, timezone, timedelta from aiogram import F from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile from app import dp, cfg from auth import is_admin_msg from keyboards import ( system_info_kb, system_ops_kb, system_logs_audit_kb, system_logs_security_kb, system_logs_integrations_kb, system_logs_kb, openwrt_kb, docker_kb, backup_kb, ) from system_checks import security, disks, hardware, list_disks, smart_last_test from services.http_checks import get_url_checks, check_url from services.queue import enqueue, get_history_raw, get_stats from services.updates import list_updates, apply_updates from services.runner import run_cmd, run_cmd_full from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host from services.gitea import get_gitea_health from services.openwrt import get_openwrt_status, fetch_openwrt_leases from services.system import worst_disk_usage import state from state import UPDATES_CACHE, REBOOT_PENDING from services.metrics import summarize from services.audit import read_audit_tail from services.incidents import read_recent, incidents_path, read_raw, infer_category from services.external_checks import format_report from services.disk_report import build_disk_report from services import runtime_state import io import json import csv import zipfile @dp.message(F.text == "💽 Disks") async def sd(msg: Message): if is_admin_msg(msg): await msg.answer(disks(), reply_markup=system_info_kb) @dp.message(F.text == "🔐 Security") async def sec(msg: Message): if is_admin_msg(msg): await msg.answer(security(), reply_markup=system_info_kb) @dp.message(F.text == "🌐 URLs") async def urls(msg: Message): if not is_admin_msg(msg): return checks = list(get_url_checks(cfg)) if not checks: await msg.answer("⚠️ Нет URL для проверки", reply_markup=system_logs_security_kb) return await msg.answer("⏳ Проверяю URL…", reply_markup=system_logs_security_kb) async def worker(): tasks = [asyncio.to_thread(check_url, url) for _, url in checks] results = await asyncio.gather(*tasks) lines = ["🌐 URLs\n"] for (alias, _url), (ok, status, ms, err) in zip(checks, results): if ok: lines.append(f"🟢 {alias}: {status} ({ms}ms)") elif status is not None: lines.append(f"🔴 {alias}: {status} ({ms}ms)") else: reason = err or "error" lines.append(f"🔴 {alias}: {reason} ({ms}ms)") await msg.answer("\n".join(lines), reply_markup=system_logs_security_kb) asyncio.create_task(worker()) @dp.message(F.text == "🔑 SSH log") async def ssh_log(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Loading SSH logins…", reply_markup=system_logs_security_kb) async def worker(): raw_lines: list[str] = [] rc, out = await run_cmd( [ "journalctl", "-t", "sshd", "-t", "sshd-session", "--since", "00:00", "--no-pager", ], timeout=20, ) if rc == 0 and out.strip(): for line in out.splitlines(): if "Accepted" in line: raw_lines.append(line) if not raw_lines: rc2, out2 = await run_cmd(["tail", "-n", "200", "/var/log/auth.log"], timeout=10) if rc2 == 0 and out2.strip(): for line in out2.splitlines(): if "Accepted" in line: raw_lines.append(line) if not raw_lines: await msg.answer("🔑 SSH log\n\n(no logins today)", reply_markup=system_logs_security_kb) return recent = raw_lines[-30:] pretty = [_format_ssh_line(line) for line in recent] text = "🔑 SSH logins (today)\n```\n" + "\n".join(pretty) + "\n```" await msg.answer(text, reply_markup=system_logs_security_kb, parse_mode="Markdown") asyncio.create_task(worker()) def _format_ssh_line(line: str) -> str: parts = line.split() if len(parts) < 10: return line month, day, time_str = parts[0], parts[1], parts[2] user = "?" ip = "?" key_type = "?" if "for" in parts: i = parts.index("for") if i + 1 < len(parts): user = parts[i + 1] if "from" in parts: i = parts.index("from") if i + 1 < len(parts): ip = parts[i + 1] if "ssh2:" in parts: i = parts.index("ssh2:") if i + 1 < len(parts): key_type = parts[i + 1] return f"{month} {day} {time_str} {user} @ {ip} ({key_type})" @dp.message(F.text == "📈 Metrics") async def metrics(msg: Message): if not is_admin_msg(msg): return if state.METRICS_STORE is None: await msg.answer("⚠️ Metrics not initialized", reply_markup=system_info_kb) return await msg.answer(summarize(state.METRICS_STORE, minutes=15), reply_markup=system_info_kb) @dp.message(F.text == "🧪 SMART test") async def smart_test(msg: Message): if not is_admin_msg(msg): return disks_list = list_disks() if not disks_list: await msg.answer("💽 Disks\n\n❌ No disks found", reply_markup=system_info_kb) return await msg.answer("🧪 Starting SMART short tests…", reply_markup=system_info_kb) async def worker(): lines = ["🧪 SMART short tests\n"] for dev in disks_list: rc, out = await run_cmd(["sudo", "smartctl", "-t", "short", dev], timeout=20) if rc != 0: lines.append(f"🔴 {dev}: {out.strip() or 'error'}") continue summary = "started" for line in out.splitlines(): if "Please wait" in line or "will complete" in line: summary = line.strip() break lines.append(f"🟢 {dev}: {summary}") await msg.answer("\n".join(lines), reply_markup=system_info_kb) asyncio.create_task(worker()) @dp.message(F.text == "🧪 SMART status") async def smart_status(msg: Message): if not is_admin_msg(msg): return disks_list = list_disks() if not disks_list: await msg.answer("💽 Disks\n\n❌ No disks found", reply_markup=system_info_kb) return lines = ["🧪 SMART last tests\n"] for dev in disks_list: last = smart_last_test(dev) lines.append(f"{dev}: {last}") await msg.answer("\n".join(lines), reply_markup=system_info_kb) @dp.message(F.text.in_({"/openwrt", "📡 Full status"})) async def openwrt_status(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking OpenWrt…", reply_markup=openwrt_kb) async def worker(): try: text = await get_openwrt_status(cfg) except Exception as e: text = f"⚠️ OpenWrt error: {e}" await msg.answer(text, reply_markup=openwrt_kb) asyncio.create_task(worker()) @dp.message(F.text == "/openwrt_wan") async def openwrt_wan(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking OpenWrt WAN…", reply_markup=openwrt_kb) async def worker(): try: text = await get_openwrt_status(cfg, mode="wan") except Exception as e: text = f"⚠️ OpenWrt error: {e}" await msg.answer(text, reply_markup=openwrt_kb) asyncio.create_task(worker()) @dp.message(F.text == "📡 OpenWrt") async def openwrt_menu(msg: Message): if not is_admin_msg(msg): return await msg.answer("📡 OpenWrt menu", reply_markup=openwrt_kb) @dp.message(F.text.in_({"/openwrt_clients", "📶 Wi-Fi clients"})) async def openwrt_clients(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking OpenWrt clients…", reply_markup=openwrt_kb) async def worker(): try: text = await get_openwrt_status(cfg, mode="clients") except Exception as e: text = f"⚠️ OpenWrt error: {e}" await msg.answer(text, reply_markup=openwrt_kb) asyncio.create_task(worker()) @dp.message(F.text.in_({"/openwrt_leases", "🧾 Leases"})) async def openwrt_leases(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking OpenWrt leases…", reply_markup=openwrt_kb) async def worker(): try: text = await get_openwrt_status(cfg, mode="leases") except Exception as e: text = f"⚠️ OpenWrt error: {e}" await msg.answer(text, reply_markup=openwrt_kb) asyncio.create_task(worker()) @dp.message(F.text == "/openwrt_fast") @dp.message(F.text == "🌐 WAN fast") async def openwrt_fast(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ OpenWrt fast…", reply_markup=openwrt_kb) async def worker(): try: text = await get_openwrt_status(cfg, mode="wan") except Exception as e: text = f"⚠️ OpenWrt error: {e}" await msg.answer(text, reply_markup=openwrt_kb) asyncio.create_task(worker()) @dp.message(F.text == "🧾 Audit") async def audit_log(msg: Message): if not is_admin_msg(msg): return text = read_audit_tail(cfg, limit=200) if text.startswith("⚠️") or text.startswith("ℹ️"): await msg.answer(text, reply_markup=system_logs_audit_kb) else: await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown") @dp.message(F.text == "🌍 External") async def external_checks(msg: Message): if not is_admin_msg(msg): return await msg.answer(format_report(cfg), reply_markup=system_logs_security_kb) @dp.message(F.text == "📣 Incidents") async def incidents(msg: Message): if not is_admin_msg(msg): return path = incidents_path(cfg) if not os.path.exists(path): await msg.answer("⚠️ Incidents log not found", reply_markup=system_logs_audit_kb) return last_24h = read_recent(cfg, hours=24, limit=500) last_7d = read_recent(cfg, hours=24 * 7, limit=1000) recent = last_24h[-30:] if last_24h else ["(no incidents)"] body = "\n".join(recent) text = ( "📣 Incidents\n\n" f"Last 24h: {len(last_24h)}\n" f"Last 7d: {len(last_7d)}\n\n" "Recent (24h):\n" f"```\n{body}\n```" ) await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown") @dp.message(F.text == "🧾 Incidents") async def incidents_entry(msg: Message): if not is_admin_msg(msg): return await msg.answer( "📣 Incidents menu:\n" "- Summary\n- Diff\n- Heatmap\n- Export/All\n- Alerts log", reply_markup=system_logs_audit_kb, ) @dp.message(F.text.in_({"/incidents_summary", "📣 Summary"})) async def incidents_summary(msg: Message): if not is_admin_msg(msg): return last_24h = read_raw(cfg, hours=24, limit=2000) last_7d = read_raw(cfg, hours=24 * 7, limit=4000) def summarize(items): total = len(items) cats = {} suppressed = {} last_seen = {} for dt, msg in items: cat = infer_category(msg) or "n/a" cats[cat] = cats.get(cat, 0) + 1 last_seen[cat] = dt if "[suppressed" in msg.lower(): suppressed[cat] = suppressed.get(cat, 0) + 1 def fmt_top(d): return ", ".join(f"{k}:{v}" for k, v in sorted(d.items(), key=lambda x: x[1], reverse=True)[:5]) or "n/a" top = fmt_top(cats) top_supp = fmt_top(suppressed) last_parts = [] for k, dt in sorted(last_seen.items(), key=lambda x: x[1], reverse=True)[:5]: last_parts.append(f"{k}:{dt.astimezone().strftime('%Y-%m-%d %H:%M')}") last_str = ", ".join(last_parts) or "n/a" return total, top, top_supp, last_str t24, top24, supp24, last24 = summarize(last_24h) t7, top7, supp7, last7 = summarize(last_7d) text = ( "📣 Incidents summary\n\n" f"24h: {t24} (top: {top24}; suppressed: {supp24}; last: {last24})\n" f"7d: {t7} (top: {top7}; suppressed: {supp7}; last: {last7})" ) await msg.answer(text, reply_markup=system_logs_kb) @dp.message(F.text.startswith("/incidents_diff")) @dp.message(F.text == "🆕 Diff") async def incidents_diff(msg: Message): if not is_admin_msg(msg): return try: parts = msg.text.split() hours = 24 reset = False if len(parts) >= 2: if parts[1].lower() in {"reset", "clear"}: reset = True else: try: hours = max(1, int(parts[1])) except ValueError: hours = 24 if reset: runtime_state.set_state("incidents_diff_last_ts", None) await msg.answer( "📣 Diff marker reset. Next run will show all within window.", reply_markup=system_logs_audit_kb, ) return last_iso = runtime_state.get("incidents_diff_last_ts") last_dt = None if isinstance(last_iso, str): try: last_dt = datetime.fromisoformat(last_iso) except Exception: last_dt = None rows = read_raw(cfg, hours=hours, limit=5000, include_old=True) def collect(from_dt): fresh: list[tuple[datetime, str]] = [] for dt, line in rows: if from_dt and dt <= from_dt: continue fresh.append((dt, line)) return fresh fresh = collect(last_dt) # auto-reset if marker is ahead of all rows if not fresh and last_dt and rows and last_dt >= rows[-1][0]: last_dt = None runtime_state.set_state("incidents_diff_last_ts", None) fresh = collect(None) def fmt_lines(items: list[tuple[datetime, str]], limit_chars: int = 3500) -> str: buf = [] total_len = 0 for dt, line in items: entry = f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" if total_len + len(entry) + 1 > limit_chars: break buf.append(entry) total_len += len(entry) + 1 return "\n".join(buf) if not fresh: note = f"since {last_dt.astimezone().strftime('%Y-%m-%d %H:%M')}" if last_dt else "for the period" if rows: sample = rows[-50:][::-1] # newest first body = fmt_lines(sample) await msg.answer( f"📣 No new incidents {note} (window {hours}h).\nRecent sample:\n```\n{body}\n```", reply_markup=system_logs_audit_kb, parse_mode="Markdown", ) else: await msg.answer( f"📣 No new incidents {note} (window {hours}h)", reply_markup=system_logs_audit_kb, ) return fresh.sort(key=lambda x: x[0], reverse=True) body = fmt_lines(fresh) await msg.answer( f"📣 New incidents (since last mark, window {hours}h): {len(fresh)}\n```\n{body}\n```", reply_markup=system_logs_audit_kb, parse_mode="Markdown", ) try: runtime_state.set_state("incidents_diff_last_ts", fresh[0][0].isoformat()) except Exception: pass except Exception as e: await msg.answer(f"⚠️ Diff error: {type(e).__name__}: {e}", reply_markup=system_logs_audit_kb) @dp.message(F.text.startswith("/alerts_heatmap")) @dp.message(F.text == "🔥 Heatmap") async def alerts_heatmap(msg: Message): if not is_admin_msg(msg): return hours = 48 category = None if msg.text.startswith("/alerts_heatmap"): parts = msg.text.split() for part in parts[1:]: if part.isdigit(): hours = max(1, int(part)) else: category = part.lower() rows = read_raw(cfg, hours=hours, limit=8000, include_old=True) buckets: dict[datetime, int] = {} for dt, line in rows: cat = infer_category(line) or "n/a" if category and cat != category: continue local = dt.astimezone() hour = local.replace(minute=0, second=0, microsecond=0) buckets[hour] = buckets.get(hour, 0) + 1 if not buckets: await msg.answer(f"⚠️ No incidents for heatmap (hours={hours}, category={category or 'any'})") return start = datetime.now().astimezone() - timedelta(hours=hours - 1) timeline = [] current = start.replace(minute=0, second=0, microsecond=0) while current <= datetime.now().astimezone(): timeline.append((current, buckets.get(current, 0))) current += timedelta(hours=1) max_count = max(c for _, c in timeline) or 1 lines = [f"📊 Alerts heatmap (hours={hours}, category={category or 'any'})"] for ts, cnt in timeline[-72:]: bar_len = max(1, int(cnt / max_count * 12)) if cnt else 0 bar = "▇" * bar_len if cnt else "" lines.append(f"{ts:%m-%d %H}: {cnt:>3} {bar}") await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb) @dp.message(F.text == "/disk_snapshot") async def disk_snapshot(msg: Message): if not is_admin_msg(msg): return usage, mount = worst_disk_usage() mount = mount or "/" try: report = await build_disk_report(cfg, mount, usage or 0) except Exception as e: await msg.answer(f"⚠️ Disk snapshot error: {e}") return await msg.answer(f"💽 Disk snapshot ({mount})\n\n{report}", reply_markup=system_info_kb) @dp.message(F.text.startswith("/alerts_log")) async def alerts_log(msg: Message): if not is_admin_msg(msg): return parts = msg.text.split() hours = 24 if len(parts) >= 2: try: hours = max(1, int(parts[1])) except ValueError: hours = 24 rows = read_raw(cfg, hours=hours, limit=2000) suppressed = [(dt, m) for dt, m in rows if "[suppressed" in m.lower()] sent = [(dt, m) for dt, m in rows if "[suppressed" not in m.lower()] lines = [f"📣 Alerts log ({hours}h)"] lines.append(f"Sent: {len(sent)}, Suppressed: {len(suppressed)}") if suppressed: lines.append("\nSuppressed:") for dt, m in suppressed[-20:]: lines.append(f"{dt:%m-%d %H:%M} {m}") await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb) @dp.message(F.text.startswith("/incidents_export")) @dp.message(F.text == "📤 Export") async def incidents_export(msg: Message): if not is_admin_msg(msg): return parts = msg.text.split() hours = 24 fmt = "csv" if len(parts) >= 2: try: hours = max(1, int(parts[1])) except ValueError: hours = 24 if len(parts) >= 3: fmt = parts[2].lower() rows = read_raw(cfg, hours=hours, limit=20000, include_old=False) data = [] for dt, msg_line in rows: data.append({ "timestamp": dt.astimezone().isoformat(), "category": infer_category(msg_line) or "n/a", "message": msg_line, }) if fmt == "json": payload = json.dumps(data, ensure_ascii=False, indent=2) file_bytes = payload.encode("utf-8") fname = f"incidents_{hours}h.json" else: sio = io.StringIO() writer = csv.DictWriter(sio, fieldnames=["timestamp", "category", "message"], delimiter=";") writer.writeheader() for row in data: writer.writerow(row) file_bytes = sio.getvalue().encode("utf-8") fname = f"incidents_{hours}h.csv" summary = f"📤 Incidents export ({hours}h): {len(data)} rows, format {fmt}" await msg.answer(summary) await msg.answer_document(document=BufferedInputFile(file_bytes, filename=fname)) @dp.message(F.text.in_({"/backup_sla", "📉 Backup SLA", "Backup SLA"})) @dp.message(F.text.contains("Backup SLA")) @dp.message(F.text.regexp(r"(?i)backup.*sla|sla.*backup")) @dp.message(F.text.func(lambda t: isinstance(t, str) and "backup" in t.lower() and "sla" in t.lower())) async def backup_sla(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking Backup SLA…", reply_markup=backup_kb) rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40) if rc != 0: await msg.answer(f"⚠️ Restic error: {out.strip() or rc}", reply_markup=backup_kb) return try: snaps = json.loads(out) except Exception as e: await msg.answer(f"⚠️ Invalid restic JSON: {e}", reply_markup=backup_kb) return if not isinstance(snaps, list) or not snaps: await msg.answer("⚠️ No snapshots found", reply_markup=backup_kb) return snaps.sort(key=lambda s: s.get("time", ""), reverse=True) last_time_raw = snaps[0].get("time") if last_time_raw: last_dt = datetime.fromisoformat(last_time_raw.replace("Z", "+00:00")) age_h = (datetime.now(tz=last_dt.tzinfo) - last_dt).total_seconds() / 3600 else: last_dt = None age_h = None sla = float(cfg.get("backup", {}).get("sla_hours", 26)) if age_h is None: status = "🟡" elif age_h <= sla: status = "🟢" elif age_h <= sla * 1.5: status = "🟡" else: status = "🔴" last_str = last_dt.astimezone().strftime("%Y-%m-%d %H:%M") if last_dt else "n/a" age_str = f"{age_h:.1f}h" if age_h is not None else "n/a" await msg.answer( f"{status} Backup SLA\n" f"Snapshots: {len(snaps)}\n" f"Last: {last_str} (age {age_str})\n" f"SLA: {sla}h", reply_markup=backup_kb, ) @dp.message(F.text.startswith("/docker_restarts")) @dp.message(F.text == "♻️ Restarts") async def docker_restarts(msg: Message): if not is_admin_msg(msg): return parts = msg.text.split() hours = 168 if len(parts) >= 2: try: hours = max(1, int(parts[1])) except ValueError: hours = 168 rows = read_raw(cfg, hours=hours, limit=5000, include_old=True) restarts = [(dt, line) for dt, line in rows if "docker_restart" in line] if not restarts: await msg.answer(f"🐳 No docker restarts in last {hours}h", reply_markup=docker_kb) return counts: dict[str, int] = {} for _dt, line in restarts: parts_line = line.split() if len(parts_line) >= 2: alias = parts_line[-1] else: alias = "unknown" counts[alias] = counts.get(alias, 0) + 1 top = ", ".join(f"{k}:{v}" for k, v in sorted(counts.items(), key=lambda x: x[1], reverse=True)) body = "\n".join(f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" for dt, line in restarts[-50:]) await msg.answer( f"🐳 Docker restarts ({hours}h): {len(restarts)} ({top})\n```\n{body}\n```", reply_markup=docker_kb, parse_mode="Markdown", ) @dp.message(F.text.startswith("/openwrt_leases_diff")) @dp.message(F.text == "🔀 Leases diff") async def openwrt_leases_diff(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ OpenWrt leases diff…", reply_markup=openwrt_kb) async def worker(): try: leases_now = await fetch_openwrt_leases(cfg) except Exception as e: await msg.answer(f"⚠️ OpenWrt error: {e}", reply_markup=openwrt_kb) return prev = runtime_state.get("openwrt_leases_prev", []) if not prev: runtime_state.set_state("openwrt_leases_prev", leases_now) await msg.answer(f"Baseline saved ({len(leases_now)} leases)", reply_markup=openwrt_kb) return prev_set = set(prev) now_set = set(leases_now) added = sorted(list(now_set - prev_set)) removed = sorted(list(prev_set - now_set)) lines = [f"🧾 OpenWrt leases diff ({len(added)} added, {len(removed)} removed)"] if added: lines.append("➕ Added:") lines.extend([f" - {x}" for x in added[:50]]) if removed: lines.append("➖ Removed:") lines.extend([f" - {x}" for x in removed[:50]]) if len(added) > 50 or len(removed) > 50: lines.append("… trimmed") runtime_state.set_state("openwrt_leases_prev", leases_now) await msg.answer("\n".join(lines), reply_markup=openwrt_kb) asyncio.create_task(worker()) @dp.message(F.text.in_({"/queue_sla", "📊 Queue SLA", "Queue SLA"})) @dp.message(F.text.contains("Queue SLA")) @dp.message(F.text.regexp(r"(?i)queue.*sla|sla.*queue")) @dp.message(F.text.func(lambda t: isinstance(t, str) and "queue" in t.lower() and "sla" in t.lower())) async def queue_sla(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Calculating Queue SLA…", reply_markup=backup_kb) hist = get_history_raw() if not hist: await msg.answer("🧾 Queue SLA: history is empty", reply_markup=backup_kb) return waits = [item.get("wait_sec", 0) for item in hist] runs = [item.get("runtime_sec", 0) for item in hist] def p95(values: list[int]) -> float: if not values: return 0.0 vals = sorted(values) k = int(0.95 * (len(vals) - 1)) return float(vals[k]) lines = [ "🧾 Queue SLA", f"Samples: {len(hist)}", f"Max wait: {max(waits)}s, p95 {p95(waits):.1f}s", f"Max run: {max(runs)}s, p95 {p95(runs):.1f}s", ] stats = get_stats() if stats: lines.append( f"Avg wait: {stats.get('avg_wait_sec', 0):.1f}s, " f"avg run: {stats.get('avg_runtime_sec', 0):.1f}s" ) await msg.answer("\n".join(lines), reply_markup=backup_kb) # Fallback router: any message with "sla" is dispatched to backup or queue SLA. @dp.message(F.text.regexp(r"(?i)sla")) async def sla_fallback(msg: Message): if not is_admin_msg(msg): return text = msg.text or "" tl = text.lower() if "queue" in tl: await queue_sla(msg) elif "backup" in tl: await backup_sla(msg) @dp.message(F.text == "/selftest_history") async def selftest_history(msg: Message): if not is_admin_msg(msg): return hist = runtime_state.get("selftest_history", []) if not hist: await msg.answer("🧪 Self-test history is empty", reply_markup=system_logs_audit_kb) return lines = ["🧪 Self-test history"] for entry in hist[:15]: ts = entry.get("ts", "")[:16] ok = entry.get("ok", False) emoji = "🟢" if ok else "🔴" summary = entry.get("summary", "") lines.append(f"{emoji} {ts} {summary}") await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb) @dp.message(F.text.startswith("/export_all")) @dp.message(F.text == "📦 Export all") async def export_all(msg: Message): if not is_admin_msg(msg): return hours = 168 parts = msg.text.split() if len(parts) >= 2: try: hours = max(1, int(parts[1])) except ValueError: hours = 168 incidents_rows = read_raw(cfg, hours=hours, limit=20000, include_old=True) inc_sio = io.StringIO() inc_writer = csv.writer(inc_sio, delimiter=";") inc_writer.writerow(["timestamp", "category", "message"]) for dt, line in incidents_rows: inc_writer.writerow([dt.astimezone().isoformat(), infer_category(line) or "n/a", line]) queue_rows = get_history_raw() q_sio = io.StringIO() q_writer = csv.writer(q_sio, delimiter=";") q_writer.writerow(["finished_at", "label", "status", "wait_sec", "runtime_sec"]) for item in queue_rows: q_writer.writerow( [ datetime.fromtimestamp(item.get("finished_at", 0)).isoformat() if item.get("finished_at") else "", item.get("label", ""), item.get("status", ""), item.get("wait_sec", ""), item.get("runtime_sec", ""), ] ) st_hist = runtime_state.get("selftest_history", []) st_sio = io.StringIO() st_writer = csv.writer(st_sio, delimiter=";") st_writer.writerow(["timestamp", "ok", "summary"]) for entry in st_hist: st_writer.writerow([entry.get("ts", ""), entry.get("ok", False), entry.get("summary", "")]) mem_zip = io.BytesIO() with zipfile.ZipFile(mem_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr(f"incidents_{hours}h.csv", inc_sio.getvalue()) zf.writestr("queue_history.csv", q_sio.getvalue()) zf.writestr("selftest_history.csv", st_sio.getvalue()) mem_zip.seek(0) summary = ( f"📦 Export all\n" f"Incidents: {len(incidents_rows)} rows ({hours}h)\n" f"Queue history: {len(queue_rows)} rows\n" f"Selftest: {len(st_hist)} rows" ) await msg.answer(summary) await msg.answer_document(document=BufferedInputFile(mem_zip.read(), filename="export_all.zip")) @dp.message(F.text == "🔒 SSL") async def ssl_certs(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking SSL certificates…", reply_markup=system_logs_security_kb) async def worker(): try: certs = await asyncio.to_thread(fetch_certificates, cfg) text = format_certificates(certs) except Exception as e: text = f"⚠️ NPMplus error: {e}" await msg.answer(text, reply_markup=system_logs_security_kb) asyncio.create_task(worker()) @dp.message(F.text == "🍵 Gitea") async def gitea_health(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking Gitea health…", reply_markup=system_logs_integrations_kb) async def worker(): try: text = await asyncio.to_thread(get_gitea_health, cfg) except Exception as e: text = f"⚠️ Gitea error: {e}" await msg.answer(text, reply_markup=system_logs_integrations_kb) asyncio.create_task(worker()) @dp.message(F.text == "🧩 NPMplus") async def npmplus_hosts(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Loading NPMplus hosts…", reply_markup=system_logs_integrations_kb) async def worker(): try: hosts = await asyncio.to_thread(list_proxy_hosts, cfg) except Exception as e: await msg.answer(f"⚠️ NPMplus error: {e}", reply_markup=system_logs_integrations_kb) return if not hosts: await msg.answer("🧩 NPMplus\n\n(no proxy hosts)", reply_markup=system_logs_integrations_kb) return lines = ["🧩 NPMplus proxy hosts\n"] rows = [] for h in hosts[:10]: hid = h.get("id") domains = h.get("domain_names") or [] name = ", ".join(domains) if isinstance(domains, list) else str(domains) forward = f"{h.get('forward_host')}:{h.get('forward_port')}" enabled = h.get("enabled", True) icon = "🟢" if enabled else "🔴" lines.append(f"{icon} {name} → {forward}") if hid is not None: action = "disable" if enabled else "enable" rows.append([ InlineKeyboardButton( text=f"{('⛔' if enabled else '✅')} {name[:12]}", callback_data=f"npmplus:{action}:{hid}" ) ]) kb = InlineKeyboardMarkup(inline_keyboard=rows) await msg.answer("\n".join(lines), reply_markup=kb) asyncio.create_task(worker()) @dp.message(F.text == "📦 Updates") async def updates_list(msg: Message): if not is_admin_msg(msg): return async def job(): title, lines = await list_updates() UPDATES_CACHE[msg.from_user.id] = { "title": title, "lines": lines, "page_size": 20, } await send_updates_page(msg, msg.from_user.id, 0, edit=False) pos = await enqueue("pkg-updates", job) await msg.answer(f"🕓 Updates queued (#{pos})", reply_markup=system_ops_kb) @dp.message(F.text == "⬆️ Upgrade") async def updates_apply(msg: Message): if not is_admin_msg(msg): return kb = InlineKeyboardMarkup( inline_keyboard=[[ InlineKeyboardButton(text="✅ Confirm", callback_data="upgrade:confirm"), InlineKeyboardButton(text="✖ Cancel", callback_data="upgrade:cancel"), ]] ) await msg.answer("⚠️ Confirm package upgrade?", reply_markup=kb) @dp.message(F.text == "🔄 Reboot") async def reboot_request(msg: Message): if not is_admin_msg(msg): return kb = InlineKeyboardMarkup( inline_keyboard=[[ InlineKeyboardButton(text="✅ Confirm", callback_data="reboot:confirm"), InlineKeyboardButton(text="✖ Cancel", callback_data="reboot:cancel"), ]] ) await msg.answer("⚠️ Confirm reboot?", reply_markup=kb) @dp.message(F.text == "🧱 Hardware") async def hw(msg: Message): if is_admin_msg(msg): await msg.answer(hardware(), reply_markup=system_info_kb) def _updates_kb(page: int, total_pages: int) -> InlineKeyboardMarkup: buttons = [] if total_pages > 1: row = [] if page > 0: row.append(InlineKeyboardButton(text="⬅️ Prev", callback_data=f"updpage:{page-1}")) if page < total_pages - 1: row.append(InlineKeyboardButton(text="Next ➡️", callback_data=f"updpage:{page+1}")) if row: buttons.append(row) return InlineKeyboardMarkup(inline_keyboard=buttons) async def send_updates_page(msg: Message, user_id: int, page: int, edit: bool): data = UPDATES_CACHE.get(user_id) if not data: await msg.answer("⚠️ Updates cache empty", reply_markup=system_ops_kb) return lines = data["lines"] page_size = data["page_size"] total_pages = max(1, (len(lines) + page_size - 1) // page_size) page = max(0, min(page, total_pages - 1)) start = page * page_size end = start + page_size body = "\n".join(lines[start:end]) text = f"{data['title']} (page {page+1}/{total_pages})\n```\n{body}\n```" if edit: await msg.edit_text(text, reply_markup=_updates_kb(page, total_pages), parse_mode="Markdown") else: await msg.answer(text, reply_markup=_updates_kb(page, total_pages), parse_mode="Markdown") @dp.callback_query(F.data.startswith("updpage:")) async def updates_page(cb: CallbackQuery): if cb.from_user.id not in UPDATES_CACHE: await cb.answer("No cached updates") return try: page = int(cb.data.split(":", 1)[1]) except ValueError: await cb.answer("Bad page") return await cb.answer() await send_updates_page(cb.message, cb.from_user.id, page, edit=True) @dp.callback_query(F.data == "upgrade:confirm") async def upgrade_confirm(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return await cb.answer() async def job(): if cfg.get("safety", {}).get("dry_run", False): await cb.message.answer("🧪 Dry-run: upgrade skipped", reply_markup=system_ops_kb) return text = await apply_updates() await cb.message.answer(text, reply_markup=system_ops_kb, parse_mode="Markdown") pos = await enqueue("pkg-upgrade", job) await cb.message.answer(f"🕓 Upgrade queued (#{pos})", reply_markup=system_ops_kb) @dp.callback_query(F.data == "upgrade:cancel") async def upgrade_cancel(cb: CallbackQuery): await cb.answer("Cancelled") await cb.message.delete() @dp.callback_query(F.data == "reboot:confirm") async def reboot_confirm(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return await cb.answer() REBOOT_PENDING[cb.from_user.id] = {} await cb.message.answer("🔐 Send reboot password", reply_markup=system_ops_kb) @dp.callback_query(F.data == "reboot:cancel") async def reboot_cancel(cb: CallbackQuery): await cb.answer("Cancelled") await cb.message.delete() @dp.callback_query(F.data.startswith("npmplus:")) async def npmplus_toggle(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return parts = cb.data.split(":") if len(parts) != 3: await cb.answer("Bad request") return action, raw_id = parts[1], parts[2] try: host_id = int(raw_id) except ValueError: await cb.answer("Bad id") return enable = action == "enable" await cb.answer("Working…") ok, info = await asyncio.to_thread(set_proxy_host, cfg, host_id, enable) if ok: await cb.message.answer("✅ Updated", reply_markup=system_logs_integrations_kb) else: await cb.message.answer(f"❌ NPMplus error: {info}", reply_markup=system_logs_integrations_kb) @dp.message(F.text, F.func(lambda msg: msg.from_user and msg.from_user.id in REBOOT_PENDING)) async def reboot_password(msg: Message): if not is_admin_msg(msg): return REBOOT_PENDING.pop(msg.from_user.id, None) expected = cfg.get("security", {}).get("reboot_password") if not expected: await msg.answer("⚠️ Reboot password not configured", reply_markup=system_ops_kb) return if (msg.text or "").strip() != expected: await msg.answer("❌ Wrong password", reply_markup=system_ops_kb) return async def job(): if cfg.get("safety", {}).get("dry_run", False): await msg.answer("🧪 Dry-run: reboot skipped", reply_markup=system_ops_kb) return await msg.answer("🔄 Rebooting…", reply_markup=system_ops_kb) await run_cmd(["sudo", "reboot"], timeout=10) pos = await enqueue("reboot", job) await msg.answer(f"🕓 Reboot queued (#{pos})", reply_markup=system_ops_kb)