import asyncio import json import os from datetime import datetime from aiogram import F from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery from app import dp, cfg from auth import is_admin_msg, is_admin_cb from keyboards import backup_kb from lock_utils import acquire_lock, release_lock from services.queue import enqueue, format_status, format_details from services.backup import backup_badge, restore_help from services.runner import run_cmd, run_cmd_full def _parse_systemctl_kv(raw: str) -> dict[str, str]: data: dict[str, str] = {} for line in raw.splitlines(): if "=" not in line: continue key, value = line.split("=", 1) data[key.strip()] = value.strip() return data async def _unit_status(unit: str, props: list[str]) -> dict[str, str]: args = ["systemctl", "show", unit] + [f"-p{prop}" for prop in props] rc, out = await run_cmd(args, timeout=10) if rc != 0: return {"error": out.strip() or f"systemctl {unit} failed"} return _parse_systemctl_kv(out) def _sudo_cmd(cmd: list[str]) -> list[str]: if os.geteuid() == 0: return cmd return ["sudo", "-E"] + cmd def _format_backup_result(rc: int, out: str) -> str: log_hint = "log: /var/log/backup-auto.log" header = "✅ Backup finished" if rc == 0 else "❌ Backup failed" lines = out.strip().splitlines() body = "\n".join(lines[:20]) if len(lines) > 20: body += f"\n… trimmed {len(lines) - 20} lines" return f"{header} (rc={rc})\n{log_hint}\n\n{body}" if body else f"{header} (rc={rc})\n{log_hint}" def _load_json(raw: str, label: str) -> tuple[bool, object | None, str]: if not raw or not raw.strip(): return False, None, f"? {label} returned empty output" try: return True, json.loads(raw), "" except json.JSONDecodeError: preview = raw.strip().splitlines() head = preview[0] if preview else "invalid output" return False, None, f"? {label} invalid JSON: {head}" async def send_backup_jobs_status(msg: Message): services = [ ("backup-auto", "backup-auto.timer"), ("restic-check", "restic-check.timer"), ("weekly-report", "weekly-report.timer"), ] service_props = ["ActiveState", "SubState", "Result", "ExecMainStatus", "ExecMainExitTimestamp"] timer_props = ["LastTriggerUSecRealtime", "NextElapseUSecRealtime"] lines = ["🕒 Backup jobs\n"] for service, timer in services: svc = await _unit_status(f"{service}.service", service_props) tmr = await _unit_status(timer, timer_props) if "error" in svc: lines.append(f"🔴 {service}: {svc['error']}") continue active = svc.get("ActiveState", "n/a") result = svc.get("Result", "n/a") exit_status = svc.get("ExecMainStatus", "n/a") last = svc.get("ExecMainExitTimestamp", "n/a") next_run = tmr.get("NextElapseUSecRealtime", "n/a") last_trigger = tmr.get("LastTriggerUSecRealtime", "n/a") lines.append( f"🧊 {service}: {active} ({result}, rc={exit_status})" ) lines.append(f" Last run: {last}") lines.append(f" Last trigger: {last_trigger}") lines.append(f" Next: {next_run}") lines.append("") await msg.answer("\n".join(lines).rstrip(), reply_markup=backup_kb) async def cmd_repo_stats(msg: Message): await msg.answer("⏳ Loading repo stats…", reply_markup=backup_kb) # --- restore-size stats --- rc1, raw1 = await run_cmd_full( ["restic", "stats", "--json"], use_restic_env=True, timeout=30 ) if rc1 != 0: await msg.answer(raw1, reply_markup=backup_kb) return ok, restore, err = _load_json(raw1, "restic stats") if not ok: await msg.answer(err, reply_markup=backup_kb) return # --- raw-data stats --- rc2, raw2 = await run_cmd_full( ["restic", "stats", "--json", "--mode", "raw-data"], use_restic_env=True, timeout=30 ) if rc2 != 0: await msg.answer(raw2, reply_markup=backup_kb) return ok, raw, err = _load_json(raw2, "restic stats raw-data") if not ok: await msg.answer(err, reply_markup=backup_kb) return # --- snapshots count --- rc3, raw_snaps = await run_cmd_full( ["restic", "snapshots", "--json"], use_restic_env=True, timeout=20 ) if rc3 != 0: snaps = "n/a" else: ok, snap_data, err = _load_json(raw_snaps, "restic snapshots") if ok and isinstance(snap_data, list): snaps = len(snap_data) else: snaps = "n/a" msg_text = ( "📦 **Repository stats**\n\n" f"🧉 Snapshots: {snaps}\n" f"📁 Files: {restore.get('total_file_count', 'n/a')}\n" f"💽 Logical size: {restore.get('total_size', 0) / (1024**3):.2f} GiB\n" f"🧱 Stored data: {raw.get('total_pack_size', 0) / (1024**2):.2f} MiB\n" ) await msg.answer(msg_text, reply_markup=backup_kb, parse_mode="Markdown") async def cmd_backup_status(msg: Message): await msg.answer("⏳ Loading snapshots…", reply_markup=backup_kb) async def worker(): rc, raw = await run_cmd_full( ["restic", "snapshots", "--json"], use_restic_env=True, timeout=30 ) if rc != 0: await msg.answer(raw, reply_markup=backup_kb) return ok, snaps, err = _load_json(raw, "restic snapshots") if not ok or not isinstance(snaps, list): await msg.answer(err, reply_markup=backup_kb) return if not snaps: await msg.answer("📦 Snapshots: none", reply_markup=backup_kb) return snaps.sort(key=lambda s: s["time"], reverse=True) # --- badge --- last = snaps[0] last_time = datetime.fromisoformat( last["time"].replace("Z", "+00:00") ) badge = backup_badge(last_time) # --- buttons --- rows = [] for s in snaps[:5]: t = datetime.fromisoformat( s["time"].replace("Z", "+00:00") ) rows.append([ InlineKeyboardButton( text=f"🧉 {s['short_id']} | {t:%Y-%m-%d %H:%M}", callback_data=f"snap:{s['short_id']}" ) ]) kb = InlineKeyboardMarkup(inline_keyboard=rows) await msg.answer( f"📦 Snapshots ({len(snaps)})\n{badge}", reply_markup=kb ) await send_backup_jobs_status(msg) asyncio.create_task(worker()) async def cmd_backup_now(msg: Message): async def job(): if cfg.get("safety", {}).get("dry_run", False): await msg.answer("🧪 Dry-run: backup skipped", reply_markup=backup_kb) return if not acquire_lock("backup"): await msg.answer("⚠️ Backup уже выполняется", reply_markup=backup_kb) return await msg.answer("▶️ Backup запущен", reply_markup=backup_kb) try: rc, out = await run_cmd( _sudo_cmd(["/usr/local/bin/backup.py", "restic-backup"]), use_restic_env=True, timeout=6 * 3600, ) await msg.answer(_format_backup_result(rc, out), reply_markup=backup_kb) finally: release_lock("backup") pos = await enqueue("backup", job) await msg.answer(f"🕓 Backup queued (#{pos})", reply_markup=backup_kb) async def cmd_last_snapshot(msg: Message): await msg.answer("⏳ Loading last snapshot…", reply_markup=backup_kb) async def worker(): rc, raw = await run_cmd_full( ["restic", "snapshots", "--json"], use_restic_env=True, timeout=20 ) if rc != 0: await msg.answer(raw, reply_markup=backup_kb) return ok, snaps, err = _load_json(raw, "restic snapshots") if not ok or not isinstance(snaps, list): await msg.answer(err, reply_markup=backup_kb) return if not snaps: await msg.answer("📦 Snapshots: none", reply_markup=backup_kb) return snaps.sort(key=lambda s: s["time"], reverse=True) s = snaps[0] t = datetime.fromisoformat(s["time"].replace("Z", "+00:00")) short_id = s["short_id"] rc2, raw2 = await run_cmd_full( ["restic", "stats", short_id, "--json"], use_restic_env=True, timeout=20 ) if rc2 != 0: await msg.answer(raw2, reply_markup=backup_kb) return ok, stats, err = _load_json(raw2, f"restic stats {short_id}") if not ok or not isinstance(stats, dict): await msg.answer(err, reply_markup=backup_kb) return msg_text = ( "📦 **Last snapshot**\n\n" f"🕒 {t:%Y-%m-%d %H:%M}\n" f"🧉 ID: {short_id}\n" f"📁 Files: {stats.get('total_file_count', 'n/a')}\n" f"💽 Size: {stats.get('total_size', 0) / (1024**3):.2f} GiB\n" ) await msg.answer(msg_text, reply_markup=backup_kb, parse_mode="Markdown") asyncio.create_task(worker()) @dp.message(F.text == "📦 Status") async def bs(msg: Message): if is_admin_msg(msg): await cmd_backup_status(msg) @dp.message(F.text == "📊 Repo stats") async def rs(msg: Message): if is_admin_msg(msg): await cmd_repo_stats(msg) @dp.message(F.text == "📦 Last snapshot") async def ls(msg: Message): if is_admin_msg(msg): await cmd_last_snapshot(msg) @dp.message(F.text == "🧾 Queue") async def qb(msg: Message): if is_admin_msg(msg): kb = InlineKeyboardMarkup( inline_keyboard=[ [InlineKeyboardButton(text="Details", callback_data="queue:details")], ] ) await msg.answer(format_status(), reply_markup=kb) @dp.callback_query(F.data == "queue:details") async def qd(cb: CallbackQuery): if not is_admin_cb(cb): return await cb.answer() await cb.message.answer(format_details(), reply_markup=backup_kb) @dp.message(F.text == "▶️ Run backup") async def br(msg: Message): if is_admin_msg(msg): await cmd_backup_now(msg) @dp.message(F.text == "/backup_run") async def br_cmd(msg: Message): if is_admin_msg(msg): await cmd_backup_now(msg) @dp.message(F.text == "🧪 Restic check") async def rc(msg: Message): if not is_admin_msg(msg): return async def job(): await msg.answer("🧪 Restic check запущен", reply_markup=backup_kb) rc2, out = await run_cmd( _sudo_cmd(["/usr/local/bin/restic-check.sh"]), use_restic_env=True, timeout=6 * 3600, ) await msg.answer(("✅ OK\n" if rc2 == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb) pos = await enqueue("restic-check", job) await msg.answer(f"🕓 Restic check queued (#{pos})", reply_markup=backup_kb) @dp.message(F.text == "📬 Weekly report") async def wr(msg: Message): if not is_admin_msg(msg): return async def job(): await msg.answer("📬 Weekly report запущен", reply_markup=backup_kb) rc2, out = await run_cmd( _sudo_cmd(["/usr/local/bin/weekly-report.sh"]), use_restic_env=True, timeout=3600, ) await msg.answer(("✅ OK\n" if rc2 == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb) pos = await enqueue("weekly-report", job) await msg.answer(f"🕓 Weekly report queued (#{pos})", reply_markup=backup_kb) @dp.message(F.text == "🧯 Restore help") async def rh(msg: Message): if is_admin_msg(msg): await msg.answer(restore_help(), reply_markup=backup_kb)