import asyncio import os from aiogram import F from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton from app import dp, cfg from auth import is_admin_msg from keyboards import ( system_info_kb, system_ops_kb, system_logs_audit_kb, system_logs_security_kb, system_logs_integrations_kb, ) from system_checks import security, disks, hardware, list_disks, smart_last_test from services.http_checks import get_url_checks, check_url from services.queue import enqueue from services.updates import list_updates, apply_updates from services.runner import run_cmd from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host from services.gitea import get_gitea_health from services.openwrt import get_openwrt_status import state from state import UPDATES_CACHE, REBOOT_PENDING from services.metrics import summarize from services.audit import read_audit_tail from services.incidents import read_recent, incidents_path from services.external_checks import format_report @dp.message(F.text == "💽 Disks") async def sd(msg: Message): if is_admin_msg(msg): await msg.answer(disks(), reply_markup=system_info_kb) @dp.message(F.text == "🔐 Security") async def sec(msg: Message): if is_admin_msg(msg): await msg.answer(security(), reply_markup=system_info_kb) @dp.message(F.text == "🌐 URLs") async def urls(msg: Message): if not is_admin_msg(msg): return checks = list(get_url_checks(cfg)) if not checks: await msg.answer("⚠️ Нет URL для проверки", reply_markup=system_logs_security_kb) return await msg.answer("⏳ Проверяю URL…", reply_markup=system_logs_security_kb) async def worker(): tasks = [asyncio.to_thread(check_url, url) for _, url in checks] results = await asyncio.gather(*tasks) lines = ["🌐 URLs\n"] for (alias, _url), (ok, status, ms, err) in zip(checks, results): if ok: lines.append(f"🟢 {alias}: {status} ({ms}ms)") elif status is not None: lines.append(f"🔴 {alias}: {status} ({ms}ms)") else: reason = err or "error" lines.append(f"🔴 {alias}: {reason} ({ms}ms)") await msg.answer("\n".join(lines), reply_markup=system_logs_security_kb) asyncio.create_task(worker()) @dp.message(F.text == "🔑 SSH log") async def ssh_log(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Loading SSH logins…", reply_markup=system_logs_security_kb) async def worker(): raw_lines: list[str] = [] rc, out = await run_cmd( [ "journalctl", "-t", "sshd", "-t", "sshd-session", "--since", "00:00", "--no-pager", ], timeout=20, ) if rc == 0 and out.strip(): for line in out.splitlines(): if "Accepted" in line: raw_lines.append(line) if not raw_lines: rc2, out2 = await run_cmd(["tail", "-n", "200", "/var/log/auth.log"], timeout=10) if rc2 == 0 and out2.strip(): for line in out2.splitlines(): if "Accepted" in line: raw_lines.append(line) if not raw_lines: await msg.answer("🔑 SSH log\n\n(no logins today)", reply_markup=system_logs_security_kb) return recent = raw_lines[-30:] pretty = [_format_ssh_line(line) for line in recent] text = "🔑 SSH logins (today)\n```\n" + "\n".join(pretty) + "\n```" await msg.answer(text, reply_markup=system_logs_security_kb, parse_mode="Markdown") asyncio.create_task(worker()) def _format_ssh_line(line: str) -> str: parts = line.split() if len(parts) < 10: return line month, day, time_str = parts[0], parts[1], parts[2] user = "?" ip = "?" key_type = "?" if "for" in parts: i = parts.index("for") if i + 1 < len(parts): user = parts[i + 1] if "from" in parts: i = parts.index("from") if i + 1 < len(parts): ip = parts[i + 1] if "ssh2:" in parts: i = parts.index("ssh2:") if i + 1 < len(parts): key_type = parts[i + 1] return f"{month} {day} {time_str} {user} @ {ip} ({key_type})" @dp.message(F.text == "📈 Metrics") async def metrics(msg: Message): if not is_admin_msg(msg): return if state.METRICS_STORE is None: await msg.answer("⚠️ Metrics not initialized", reply_markup=system_info_kb) return await msg.answer(summarize(state.METRICS_STORE, minutes=15), reply_markup=system_info_kb) @dp.message(F.text == "🧪 SMART test") async def smart_test(msg: Message): if not is_admin_msg(msg): return disks_list = list_disks() if not disks_list: await msg.answer("💽 Disks\n\n❌ No disks found", reply_markup=system_info_kb) return await msg.answer("🧪 Starting SMART short tests…", reply_markup=system_info_kb) async def worker(): lines = ["🧪 SMART short tests\n"] for dev in disks_list: rc, out = await run_cmd(["sudo", "smartctl", "-t", "short", dev], timeout=20) if rc != 0: lines.append(f"🔴 {dev}: {out.strip() or 'error'}") continue summary = "started" for line in out.splitlines(): if "Please wait" in line or "will complete" in line: summary = line.strip() break lines.append(f"🟢 {dev}: {summary}") await msg.answer("\n".join(lines), reply_markup=system_info_kb) asyncio.create_task(worker()) @dp.message(F.text == "🧪 SMART status") async def smart_status(msg: Message): if not is_admin_msg(msg): return disks_list = list_disks() if not disks_list: await msg.answer("💽 Disks\n\n❌ No disks found", reply_markup=system_info_kb) return lines = ["🧪 SMART last tests\n"] for dev in disks_list: last = smart_last_test(dev) lines.append(f"{dev}: {last}") await msg.answer("\n".join(lines), reply_markup=system_info_kb) @dp.message(F.text == "📡 OpenWrt") async def openwrt_status(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking OpenWrt…", reply_markup=system_info_kb) async def worker(): try: text = await get_openwrt_status(cfg) except Exception as e: text = f"⚠️ OpenWrt error: {e}" await msg.answer(text, reply_markup=system_info_kb) asyncio.create_task(worker()) @dp.message(F.text == "/openwrt") async def openwrt_cmd(msg: Message): if not is_admin_msg(msg): return await openwrt_status(msg) @dp.message(F.text == "/openwrt_wan") async def openwrt_wan(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking OpenWrt WAN…", reply_markup=system_info_kb) async def worker(): try: text = await get_openwrt_status(cfg, mode="wan") except Exception as e: text = f"⚠️ OpenWrt error: {e}" await msg.answer(text, reply_markup=system_info_kb) asyncio.create_task(worker()) @dp.message(F.text == "/openwrt_clients") async def openwrt_clients(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking OpenWrt clients…", reply_markup=system_info_kb) async def worker(): try: text = await get_openwrt_status(cfg, mode="clients") except Exception as e: text = f"⚠️ OpenWrt error: {e}" await msg.answer(text, reply_markup=system_info_kb) asyncio.create_task(worker()) @dp.message(F.text == "/openwrt_leases") async def openwrt_leases(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking OpenWrt leases…", reply_markup=system_info_kb) async def worker(): try: text = await get_openwrt_status(cfg, mode="leases") except Exception as e: text = f"⚠️ OpenWrt error: {e}" await msg.answer(text, reply_markup=system_info_kb) asyncio.create_task(worker()) @dp.message(F.text == "🧾 Audit") async def audit_log(msg: Message): if not is_admin_msg(msg): return text = read_audit_tail(cfg, limit=200) if text.startswith("⚠️") or text.startswith("ℹ️"): await msg.answer(text, reply_markup=system_logs_audit_kb) else: await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown") @dp.message(F.text == "🌍 External") async def external_checks(msg: Message): if not is_admin_msg(msg): return await msg.answer(format_report(cfg), reply_markup=system_logs_security_kb) @dp.message(F.text == "📣 Incidents") async def incidents(msg: Message): if not is_admin_msg(msg): return path = incidents_path(cfg) if not os.path.exists(path): await msg.answer("⚠️ Incidents log not found", reply_markup=system_logs_audit_kb) return last_24h = read_recent(cfg, hours=24, limit=500) last_7d = read_recent(cfg, hours=24 * 7, limit=1000) recent = last_24h[-30:] if last_24h else ["(no incidents)"] body = "\n".join(recent) text = ( "📣 Incidents\n\n" f"Last 24h: {len(last_24h)}\n" f"Last 7d: {len(last_7d)}\n\n" "Recent (24h):\n" f"```\n{body}\n```" ) await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown") @dp.message(F.text == "🔒 SSL") async def ssl_certs(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking SSL certificates…", reply_markup=system_logs_security_kb) async def worker(): try: certs = await asyncio.to_thread(fetch_certificates, cfg) text = format_certificates(certs) except Exception as e: text = f"⚠️ NPMplus error: {e}" await msg.answer(text, reply_markup=system_logs_security_kb) asyncio.create_task(worker()) @dp.message(F.text == "🍵 Gitea") async def gitea_health(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Checking Gitea health…", reply_markup=system_logs_integrations_kb) async def worker(): try: text = await asyncio.to_thread(get_gitea_health, cfg) except Exception as e: text = f"⚠️ Gitea error: {e}" await msg.answer(text, reply_markup=system_logs_integrations_kb) asyncio.create_task(worker()) @dp.message(F.text == "🧩 NPMplus") async def npmplus_hosts(msg: Message): if not is_admin_msg(msg): return await msg.answer("⏳ Loading NPMplus hosts…", reply_markup=system_logs_integrations_kb) async def worker(): try: hosts = await asyncio.to_thread(list_proxy_hosts, cfg) except Exception as e: await msg.answer(f"⚠️ NPMplus error: {e}", reply_markup=system_logs_integrations_kb) return if not hosts: await msg.answer("🧩 NPMplus\n\n(no proxy hosts)", reply_markup=system_logs_integrations_kb) return lines = ["🧩 NPMplus proxy hosts\n"] rows = [] for h in hosts[:10]: hid = h.get("id") domains = h.get("domain_names") or [] name = ", ".join(domains) if isinstance(domains, list) else str(domains) forward = f"{h.get('forward_host')}:{h.get('forward_port')}" enabled = h.get("enabled", True) icon = "🟢" if enabled else "🔴" lines.append(f"{icon} {name} → {forward}") if hid is not None: action = "disable" if enabled else "enable" rows.append([ InlineKeyboardButton( text=f"{('⛔' if enabled else '✅')} {name[:12]}", callback_data=f"npmplus:{action}:{hid}" ) ]) kb = InlineKeyboardMarkup(inline_keyboard=rows) await msg.answer("\n".join(lines), reply_markup=kb) asyncio.create_task(worker()) @dp.message(F.text == "📦 Updates") async def updates_list(msg: Message): if not is_admin_msg(msg): return async def job(): title, lines = await list_updates() UPDATES_CACHE[msg.from_user.id] = { "title": title, "lines": lines, "page_size": 20, } await send_updates_page(msg, msg.from_user.id, 0, edit=False) pos = await enqueue("pkg-updates", job) await msg.answer(f"🕓 Updates queued (#{pos})", reply_markup=system_ops_kb) @dp.message(F.text == "⬆️ Upgrade") async def updates_apply(msg: Message): if not is_admin_msg(msg): return kb = InlineKeyboardMarkup( inline_keyboard=[[ InlineKeyboardButton(text="✅ Confirm", callback_data="upgrade:confirm"), InlineKeyboardButton(text="✖ Cancel", callback_data="upgrade:cancel"), ]] ) await msg.answer("⚠️ Confirm package upgrade?", reply_markup=kb) @dp.message(F.text == "🔄 Reboot") async def reboot_request(msg: Message): if not is_admin_msg(msg): return kb = InlineKeyboardMarkup( inline_keyboard=[[ InlineKeyboardButton(text="✅ Confirm", callback_data="reboot:confirm"), InlineKeyboardButton(text="✖ Cancel", callback_data="reboot:cancel"), ]] ) await msg.answer("⚠️ Confirm reboot?", reply_markup=kb) @dp.message(F.text == "🧱 Hardware") async def hw(msg: Message): if is_admin_msg(msg): await msg.answer(hardware(), reply_markup=system_info_kb) def _updates_kb(page: int, total_pages: int) -> InlineKeyboardMarkup: buttons = [] if total_pages > 1: row = [] if page > 0: row.append(InlineKeyboardButton(text="⬅️ Prev", callback_data=f"updpage:{page-1}")) if page < total_pages - 1: row.append(InlineKeyboardButton(text="Next ➡️", callback_data=f"updpage:{page+1}")) if row: buttons.append(row) return InlineKeyboardMarkup(inline_keyboard=buttons) async def send_updates_page(msg: Message, user_id: int, page: int, edit: bool): data = UPDATES_CACHE.get(user_id) if not data: await msg.answer("⚠️ Updates cache empty", reply_markup=system_ops_kb) return lines = data["lines"] page_size = data["page_size"] total_pages = max(1, (len(lines) + page_size - 1) // page_size) page = max(0, min(page, total_pages - 1)) start = page * page_size end = start + page_size body = "\n".join(lines[start:end]) text = f"{data['title']} (page {page+1}/{total_pages})\n```\n{body}\n```" if edit: await msg.edit_text(text, reply_markup=_updates_kb(page, total_pages), parse_mode="Markdown") else: await msg.answer(text, reply_markup=_updates_kb(page, total_pages), parse_mode="Markdown") @dp.callback_query(F.data.startswith("updpage:")) async def updates_page(cb: CallbackQuery): if cb.from_user.id not in UPDATES_CACHE: await cb.answer("No cached updates") return try: page = int(cb.data.split(":", 1)[1]) except ValueError: await cb.answer("Bad page") return await cb.answer() await send_updates_page(cb.message, cb.from_user.id, page, edit=True) @dp.callback_query(F.data == "upgrade:confirm") async def upgrade_confirm(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return await cb.answer() async def job(): if cfg.get("safety", {}).get("dry_run", False): await cb.message.answer("🧪 Dry-run: upgrade skipped", reply_markup=system_ops_kb) return text = await apply_updates() await cb.message.answer(text, reply_markup=system_ops_kb, parse_mode="Markdown") pos = await enqueue("pkg-upgrade", job) await cb.message.answer(f"🕓 Upgrade queued (#{pos})", reply_markup=system_ops_kb) @dp.callback_query(F.data == "upgrade:cancel") async def upgrade_cancel(cb: CallbackQuery): await cb.answer("Cancelled") await cb.message.delete() @dp.callback_query(F.data == "reboot:confirm") async def reboot_confirm(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return await cb.answer() REBOOT_PENDING[cb.from_user.id] = {} await cb.message.answer("🔐 Send reboot password", reply_markup=system_ops_kb) @dp.callback_query(F.data == "reboot:cancel") async def reboot_cancel(cb: CallbackQuery): await cb.answer("Cancelled") await cb.message.delete() @dp.callback_query(F.data.startswith("npmplus:")) async def npmplus_toggle(cb: CallbackQuery): if cb.from_user.id != cfg["telegram"]["admin_id"]: return parts = cb.data.split(":") if len(parts) != 3: await cb.answer("Bad request") return action, raw_id = parts[1], parts[2] try: host_id = int(raw_id) except ValueError: await cb.answer("Bad id") return enable = action == "enable" await cb.answer("Working…") ok, info = await asyncio.to_thread(set_proxy_host, cfg, host_id, enable) if ok: await cb.message.answer("✅ Updated", reply_markup=system_logs_integrations_kb) else: await cb.message.answer(f"❌ NPMplus error: {info}", reply_markup=system_logs_integrations_kb) @dp.message(F.text, F.func(lambda msg: msg.from_user and msg.from_user.id in REBOOT_PENDING)) async def reboot_password(msg: Message): if not is_admin_msg(msg): return REBOOT_PENDING.pop(msg.from_user.id, None) expected = cfg.get("security", {}).get("reboot_password") if not expected: await msg.answer("⚠️ Reboot password not configured", reply_markup=system_ops_kb) return if (msg.text or "").strip() != expected: await msg.answer("❌ Wrong password", reply_markup=system_ops_kb) return async def job(): if cfg.get("safety", {}).get("dry_run", False): await msg.answer("🧪 Dry-run: reboot skipped", reply_markup=system_ops_kb) return await msg.answer("🔄 Rebooting…", reply_markup=system_ops_kb) await run_cmd(["sudo", "reboot"], timeout=10) pos = await enqueue("reboot", job) await msg.answer(f"🕓 Reboot queued (#{pos})", reply_markup=system_ops_kb)