#!/usr/bin/env python3 import asyncio import json import os import socket import time from datetime import datetime, timezone from pathlib import Path from typing import Dict, Optional from system_checks import security, disks import psutil import yaml from aiogram import Bot, Dispatcher, F from aiogram.types import ( Message, CallbackQuery, ReplyKeyboardMarkup, KeyboardButton, InlineKeyboardMarkup, InlineKeyboardButton, ) # ===================== CONFIG ===================== CONFIG_FILE = "/opt/tg-bot/config.yaml" def load_cfg(): with open(CONFIG_FILE) as f: return yaml.safe_load(f) def load_env(env_file: str) -> Dict[str, str]: env = {} with open(env_file) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if line.startswith("export "): line = line[len("export "):] if "=" in line: k, v = line.split("=", 1) env[k] = v.strip().strip('"') return env cfg = load_cfg() TOKEN = cfg["telegram"]["token"] ADMIN_ID = cfg["telegram"]["admin_id"] ARTIFACT_STATE = cfg["paths"]["artifact_state"] RESTIC_ENV = load_env(cfg["paths"].get("restic_env", "/etc/restic/restic.env")) DISK_WARN = int(cfg.get("thresholds", {}).get("disk_warn", 80)) LOAD_WARN = float(cfg.get("thresholds", {}).get("load_warn", 2.0)) bot = Bot(TOKEN) dp = Dispatcher() DOCKER_MAP: Dict[str, str] = {} def is_admin(msg: Message) -> bool: return msg.from_user and msg.from_user.id == ADMIN_ID def container_uptime(started_at: str) -> str: """ started_at: 2026-02-06T21:14:33.123456789Z """ try: start = datetime.fromisoformat( started_at.replace("Z", "+00:00") ).astimezone(timezone.utc) delta = datetime.now(timezone.utc) - start days = delta.days hours = delta.seconds // 3600 minutes = (delta.seconds % 3600) // 60 if days > 0: return f"{days}d {hours}h" if hours > 0: return f"{hours}h {minutes}m" return f"{minutes}m" except Exception: return "unknown" def backup_badge(last_time: datetime) -> str: age = datetime.now(timezone.utc) - last_time hours = age.total_seconds() / 3600 if hours < 24: return "🟢 Backup: OK" if hours < 72: return "🟡 Backup: stale" return "🔴 Backup: OLD" def format_disks(): parts = psutil.disk_partitions(all=False) lines = [] skip_prefixes = ( "/snap", "/proc", "/sys", "/run", "/boot/efi", ) for p in parts: mp = p.mountpoint if mp.startswith(skip_prefixes): continue try: usage = psutil.disk_usage(mp) except PermissionError: continue icon = "🟢" if usage.percent > 90: icon = "🔴" elif usage.percent > 80: icon = "🟡" lines.append( f"{icon} **{mp}**: " f"{usage.used // (1024**3)} / {usage.total // (1024**3)} GiB " f"({usage.percent}%)" ) if not lines: return "💾 Disks: n/a" return "💾 **Disks**\n" + "\n".join(lines) # ===================== KEYBOARDS ===================== menu_kb = ReplyKeyboardMarkup( keyboard=[ [KeyboardButton(text="🩺 Health"), KeyboardButton(text="📊 Статус")], [KeyboardButton(text="🐳 Docker"), KeyboardButton(text="📦 Backup")], [KeyboardButton(text="🧊 Artifacts"), KeyboardButton(text="⚙️ System")], [KeyboardButton(text="ℹ️ Help")], ], resize_keyboard=True, ) docker_kb = ReplyKeyboardMarkup( keyboard=[ [KeyboardButton(text="🐳 Status")], [KeyboardButton(text="🔄 Restart"), KeyboardButton(text="📜 Logs")], [KeyboardButton(text="⬅️ Назад")], ], resize_keyboard=True, ) backup_kb = ReplyKeyboardMarkup( keyboard=[ [KeyboardButton(text="📦 Status")], [KeyboardButton(text="📊 Repo stats")], [KeyboardButton(text="▶️ Run backup")], [KeyboardButton(text="🧯 Restore help")], [KeyboardButton(text="⬅️ Назад")], ], resize_keyboard=True, ) artifacts_kb = ReplyKeyboardMarkup( keyboard=[ [KeyboardButton(text="🧊 Status")], [KeyboardButton(text="📤 Upload")], [KeyboardButton(text="⬅️ Назад")], ], resize_keyboard=True, ) system_kb = ReplyKeyboardMarkup( keyboard=[ [KeyboardButton(text="💽 Disks"), KeyboardButton(text="🔐 Security")], [KeyboardButton(text="🔄 Reboot")], [KeyboardButton(text="⬅️ Назад")], ], resize_keyboard=True, ) def docker_inline_kb(action: str) -> InlineKeyboardMarkup: rows = [] for alias in DOCKER_MAP.keys(): rows.append([ InlineKeyboardButton( text=alias, callback_data=f"docker:{action}:{alias}" ) ]) return InlineKeyboardMarkup(inline_keyboard=rows) # ===================== LOCKS ===================== LOCK_DIR = Path("/var/run/tg-bot") LOCK_DIR.mkdir(parents=True, exist_ok=True) def lock_path(name: str) -> Path: return LOCK_DIR / f"{name}.lock" def acquire_lock(name: str) -> bool: p = lock_path(name) if p.exists(): return False p.write_text(str(time.time())) return True def release_lock(name: str): p = lock_path(name) if p.exists(): p.unlink() # ===================== COMMAND RUNNER ===================== async def run_cmd(cmd: list[str], *, use_restic_env=False, timeout=60): env = os.environ.copy() env["PATH"] = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" if use_restic_env: env.update(RESTIC_ENV) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, env=env, ) try: out, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout) return proc.returncode, out.decode(errors="ignore")[-3500:] except asyncio.TimeoutError: proc.kill() return 124, "❌ timeout" async def build_docker_map(cfg) -> Dict[str, str]: docker_cfg = cfg.get("docker", {}) result: Dict[str, str] = {} # 1. autodiscovery if docker_cfg.get("autodiscovery"): rc, raw = await run_cmd( ["sudo", "docker", "ps", "--format", "{{.Names}}"], timeout=20 ) if rc == 0: names = raw.splitlines() patterns = docker_cfg.get("match", []) for name in names: if any(p in name for p in patterns): result[name] = name # 2. aliases override aliases = docker_cfg.get("aliases", {}) for alias, real in aliases.items(): result[alias] = real return result async def get_last_snapshot() -> Optional[dict]: rc, raw = await run_cmd( ["restic", "snapshots", "--json"], use_restic_env=True, timeout=20 ) if rc != 0: return None snaps = json.loads(raw) if not snaps: return None snaps.sort(key=lambda s: s["time"], reverse=True) return snaps[0] # ===================== CORE COMMANDS ===================== async def cmd_repo_stats(msg: Message): await msg.answer("⏳ Loading repo stats…", reply_markup=backup_kb) # --- restore-size stats --- rc1, raw1 = await run_cmd( ["restic", "stats", "--json"], use_restic_env=True, timeout=30 ) if rc1 != 0: await msg.answer(raw1, reply_markup=backup_kb) return restore = json.loads(raw1) # --- raw-data stats --- rc2, raw2 = await run_cmd( ["restic", "stats", "--json", "--mode", "raw-data"], use_restic_env=True, timeout=30 ) if rc2 != 0: await msg.answer(raw2, reply_markup=backup_kb) return raw = json.loads(raw2) # --- snapshots count --- rc3, raw_snaps = await run_cmd( ["restic", "snapshots", "--json"], use_restic_env=True, timeout=20 ) snaps = len(json.loads(raw_snaps)) if rc3 == 0 else "n/a" msg_text = ( "📦 **Repository stats**\n\n" f"🧊 Snapshots: {snaps}\n" f"📁 Files: {restore.get('total_file_count', 'n/a')}\n" f"💾 Logical size: {restore.get('total_size', 0) / (1024**3):.2f} GiB\n" f"🧱 Stored data: {raw.get('total_pack_size', 0) / (1024**2):.2f} MiB\n" ) await msg.answer(msg_text, reply_markup=backup_kb, parse_mode="Markdown") async def discover_containers(cfg) -> Dict[str, str]: """ returns: alias -> real container name """ docker_cfg = cfg.get("docker", {}) result: Dict[str, str] = {} # --- autodiscovery --- if docker_cfg.get("autodiscovery"): rc, raw = await run_cmd( ["sudo", "docker", "ps", "--format", "{{.Names}}"], timeout=20 ) if rc == 0: found = raw.splitlines() label = docker_cfg.get("label") patterns = docker_cfg.get("match", []) for name in found: # label-based discovery if label: key, val = label.split("=", 1) rc2, lbl = await run_cmd([ "sudo", "docker", "inspect", "-f", f"{{{{ index .Config.Labels \"{key}\" }}}}", name ]) if rc2 == 0 and lbl.strip() == val: result[name] = name continue # name-pattern discovery if any(p in name for p in patterns): result[name] = name # --- manual aliases ALWAYS override --- aliases = docker_cfg.get("aliases", {}) for alias, real in aliases.items(): result[alias] = real return result async def cmd_status(msg: Message): now = time.time() uptime_sec = int(now - psutil.boot_time()) days, rem = divmod(uptime_sec, 86400) hours, rem = divmod(rem, 3600) minutes, _ = divmod(rem, 60) load1 = psutil.getloadavg()[0] cpu_icon = "🟢" if load1 > 2.0: cpu_icon = "🔴" elif load1 > 1.0: cpu_icon = "🟡" mem = psutil.virtual_memory() disks = format_disks() await msg.answer( "📊 **Server status**\n\n" f"🖥 **Host:** `{socket.gethostname()}`\n" f"⏱ **Uptime:** {days}d {hours}h {minutes}m\n" f"{cpu_icon} **Load (1m):** {load1:.2f}\n" f"🧠 **RAM:** {mem.used // (1024**3)} / {mem.total // (1024**3)} GiB ({mem.percent}%)\n\n" f"{disks}", reply_markup=menu_kb, parse_mode="Markdown", ) async def cmd_health(msg: Message): await msg.answer("⏳ Health-check…", reply_markup=menu_kb) async def worker(): lines = ["🩺 Health\n"] rc, _ = await run_cmd(["restic", "snapshots", "--latest", "1"], use_restic_env=True, timeout=20) lines.append("🟢 Backup repo OK" if rc == 0 else "🔴 Backup repo FAIL") bad = [] for alias, real in DOCKER_MAP.items(): rc2, state = await run_cmd(["sudo", "docker", "inspect", "-f", "{{.State.Status}}", real], timeout=10) if rc2 != 0 or state.strip() != "running": bad.append(alias) lines.append("🟢 Docker OK" if not bad else f"🔴 Docker down: {', '.join(bad)}") disk = psutil.disk_usage("/mnt/data") lines.append(("🟡" if disk.percent >= DISK_WARN else "🟢") + f" Disk {disk.percent}%") load = psutil.getloadavg()[0] lines.append(("🟡" if load >= LOAD_WARN else "🟢") + f" Load {load:.2f}") await msg.answer("\n".join(lines), reply_markup=menu_kb) asyncio.create_task(worker()) async def cmd_backup_status(msg: Message): await msg.answer("⏳ Loading snapshots…", reply_markup=backup_kb) async def worker(): rc, raw = await run_cmd( ["restic", "snapshots", "--json"], use_restic_env=True, timeout=30 ) if rc != 0: await msg.answer(raw, reply_markup=backup_kb) return snaps = json.loads(raw) if not snaps: await msg.answer("📦 Snapshots: none", reply_markup=backup_kb) return snaps.sort(key=lambda s: s["time"], reverse=True) # --- badge --- last = snaps[0] last_time = datetime.fromisoformat( last["time"].replace("Z", "+00:00") ) badge = backup_badge(last_time) # --- buttons --- rows = [] for s in snaps[:5]: t = datetime.fromisoformat( s["time"].replace("Z", "+00:00") ) rows.append([ InlineKeyboardButton( text=f"🧊 {s['short_id']} | {t:%Y-%m-%d %H:%M}", callback_data=f"snap:{s['short_id']}" ) ]) kb = InlineKeyboardMarkup(inline_keyboard=rows) await msg.answer( f"📦 Snapshots ({len(snaps)})\n{badge}", reply_markup=kb ) asyncio.create_task(worker()) async def cmd_backup_now(msg: Message): if not acquire_lock("backup"): await msg.answer("⛔ Backup уже выполняется", reply_markup=backup_kb) return await msg.answer("▶️ Backup запущен", reply_markup=backup_kb) async def worker(): try: rc, out = await run_cmd(["sudo", "/usr/local/bin/backup.py", "restic-backup"], timeout=6*3600) await msg.answer(("✅ OK\n" if rc == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb) finally: release_lock("backup") asyncio.create_task(worker()) async def cmd_artifacts_status(msg: Message): p = Path(ARTIFACT_STATE) if not p.exists(): await msg.answer("❌ state.json не найден", reply_markup=artifacts_kb) return data = json.loads(p.read_text()) lines = [f"🧊 Artifacts ({len(data)})\n"] for name, info in data.items(): t = datetime.fromisoformat(info["updated_at"]) lines.append(f"• {name} — {t:%Y-%m-%d %H:%M}") await msg.answer("\n".join(lines), reply_markup=artifacts_kb) async def cmd_artifacts_upload(msg: Message): if not acquire_lock("artifacts"): await msg.answer("⛔ Upload уже идёт", reply_markup=artifacts_kb) return await msg.answer("📤 Upload…", reply_markup=artifacts_kb) async def worker(): try: rc, out = await run_cmd(["sudo", "/usr/local/bin/backup.py", "artifact-upload"], timeout=12*3600) await msg.answer(("✅ OK\n" if rc == 0 else "❌ FAIL\n") + out, reply_markup=artifacts_kb) finally: release_lock("artifacts") asyncio.create_task(worker()) async def cmd_docker_status(msg: Message): try: if not DOCKER_MAP: await msg.answer( "⚠️ DOCKER_MAP пуст.\n" "Контейнеры не обнаружены.", reply_markup=docker_kb, ) return lines = ["🐳 Docker containers\n"] for alias, real in DOCKER_MAP.items(): rc, raw = await run_cmd( [ "sudo", "docker", "inspect", "-f", "{{.State.Status}}|{{.State.StartedAt}}", real ], timeout=10, ) if rc != 0: lines.append(f"🔴 {alias}: inspect error") continue raw = raw.strip() if "|" not in raw: lines.append(f"🟡 {alias}: invalid inspect output") continue status, started = raw.split("|", 1) up = container_uptime(started) icon = "🟢" if status == "running" else "🔴" lines.append(f"{icon} {alias}: {status} ({up})") await msg.answer("\n".join(lines), reply_markup=docker_kb) except Exception as e: # ⬅️ КРИТИЧЕСКИ ВАЖНО await msg.answer( "❌ Docker status crashed:\n" f"```{type(e).__name__}: {e}```", reply_markup=docker_kb, parse_mode="Markdown", ) async def cmd_security(msg: Message): await msg.answer(security(), reply_markup=system_kb) async def cmd_disks(msg: Message): await msg.answer(disks(), reply_markup=system_kb) # ===================== MENU HANDLERS ===================== @dp.message(F.text == "/start") async def start(msg: Message): if is_admin(msg): await msg.answer("🏠 Главное меню", reply_markup=menu_kb) @dp.message(F.text == "⬅️ Назад") async def back(msg: Message): if is_admin(msg): await msg.answer("🏠 Главное меню", reply_markup=menu_kb) @dp.message(F.text == "🩺 Health") async def h(msg: Message): if is_admin(msg): await cmd_health(msg) @dp.message(F.text == "📊 Статус") async def st(msg: Message): if is_admin(msg): await cmd_status(msg) @dp.message(F.text == "🐳 Docker") async def dm(msg: Message): if is_admin(msg): await msg.answer("🐳 Docker", reply_markup=docker_kb) @dp.message(F.text == "📦 Backup") async def bm(msg: Message): if is_admin(msg): await msg.answer("📦 Backup", reply_markup=backup_kb) @dp.message(F.text == "🧊 Artifacts") async def am(msg: Message): if is_admin(msg): await msg.answer("🧊 Artifacts", reply_markup=artifacts_kb) @dp.message(F.text == "⚙️ System") async def sm(msg: Message): if is_admin(msg): await msg.answer("⚙️ System", reply_markup=system_kb) @dp.message(F.text == "🔄 Restart") async def dr(msg: Message): if is_admin(msg): await msg.answer( "🔄 Выберите контейнер для рестарта:", reply_markup=docker_inline_kb("restart") ) @dp.message(F.text == "📜 Logs") async def dl(msg: Message): if is_admin(msg): await msg.answer( "📜 Выберите контейнер для логов:", reply_markup=docker_inline_kb("logs") ) @dp.message(F.text == "🐳 Status") async def ds(msg: Message): if is_admin(msg): await cmd_docker_status(msg) @dp.message(F.text == "📦 Status") async def bs(msg: Message): if is_admin(msg): await cmd_backup_status(msg) @dp.message(F.text == "📊 Repo stats") async def rs(msg: Message): if is_admin(msg): await cmd_repo_stats(msg) @dp.message(F.text == "▶️ Run backup") async def br(msg: Message): if is_admin(msg): await cmd_backup_now(msg) @dp.message(F.text == "🧯 Restore help") async def rh(msg: Message): if is_admin(msg): await msg.answer( "🧯 Restore help\n\nrestic restore --target /restore", reply_markup=backup_kb, ) @dp.message(F.text == "🧊 Status") async def ars(msg: Message): if is_admin(msg): await cmd_artifacts_status(msg) @dp.message(F.text == "📤 Upload") async def aru(msg: Message): if is_admin(msg): await cmd_artifacts_upload(msg) @dp.message(F.text == "💽 Disks") async def sd(msg: Message): if is_admin(msg): await cmd_disks(msg) @dp.message(F.text == "🔐 Security") async def sec(msg: Message): if is_admin(msg): await cmd_security(msg) @dp.message(F.text == "ℹ️ Help") async def help_cmd(msg: Message): if not is_admin(msg): return await msg.answer( "ℹ️ **Help / Справка**\n\n" "🩺 Health — быстрый health-check сервера\n" "📊 Статус — общая загрузка сервера\n" "🐳 Docker — управление контейнерами\n" "📦 Backup — restic бэкапы\n" "🧊 Artifacts — критичные образы (Clonezilla, NAND)\n" "⚙️ System — диски, безопасность, reboot\n\n" "Inline-кнопки используются для выбора контейнеров.", reply_markup=menu_kb, parse_mode="Markdown", ) # ===================== INLINE CALLBACKS ===================== @dp.callback_query(F.data.startswith("docker:")) async def docker_callback(cb: CallbackQuery): if cb.from_user.id != ADMIN_ID: return _, action, alias = cb.data.split(":", 2) real = DOCKER_MAP[alias] if action == "restart": await cb.answer("Restarting…") rc, out = await run_cmd(["sudo", "docker", "restart", real]) await cb.message.answer( f"🔄 **{alias} restarted**\n```{out}```", parse_mode="Markdown" ) elif action == "logs": await cb.answer("Loading logs…") rc, out = await run_cmd( ["sudo", "docker", "logs", "--tail", "80", real] ) await cb.message.answer( f"📜 **Logs: {alias}**\n```{out}```", parse_mode="Markdown" ) @dp.callback_query(F.data.startswith("snap:")) async def snapshot_details(cb: CallbackQuery): if cb.from_user.id != ADMIN_ID: return snap_id = cb.data.split(":", 1)[1] await cb.answer("Loading snapshot…") # получаем статистику snapshot rc, raw = await run_cmd( ["restic", "stats", snap_id, "--json"], use_restic_env=True, timeout=20 ) if rc != 0: await cb.message.answer(raw) return stats = json.loads(raw) msg = ( f"🧊 **Snapshot {snap_id}**\n\n" f"📁 Files: {stats.get('total_file_count', 'n/a')}\n" f"💾 Size: {stats.get('total_size', 0) / (1024**3):.2f} GiB\n\n" "🧯 Restore:\n" f"`restic restore {snap_id} --target /restore`\n" ) back_kb = InlineKeyboardMarkup( inline_keyboard=[ [ InlineKeyboardButton( text="⬅️ Back to snapshots", callback_data="snapback" ) ] ] ) await cb.message.answer(msg, reply_markup=back_kb, parse_mode="Markdown") @dp.callback_query(F.data == "snapback") async def snapshot_back(cb: CallbackQuery): await cb.answer() # просто вызываем статус снова fake_msg = cb.message await cmd_backup_status(fake_msg) # ===================== WATCHDOG / START ===================== async def notify_start(): await bot.send_message( ADMIN_ID, f"🤖 Bot started\n🖥 {socket.gethostname()}\n🕒 {datetime.now():%Y-%m-%d %H:%M}", reply_markup=menu_kb, ) async def main(): global DOCKER_MAP DOCKER_MAP = await discover_containers(cfg) await notify_start() await dp.start_polling(bot) if __name__ == "__main__": asyncio.run(main())