Files
tg-admin-bot/handlers/backup.py

568 lines
18 KiB
Python

import asyncio
import json
import os
from datetime import datetime
from aiogram import F
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from app import dp, cfg
from auth import is_admin_msg, is_admin_cb
from keyboards import backup_kb
from lock_utils import acquire_lock, release_lock
from services.queue import enqueue, format_status, format_details
from services.backup import backup_badge, restore_help
from services.runner import run_cmd, run_cmd_full
def _parse_systemctl_kv(raw: str) -> dict[str, str]:
data: dict[str, str] = {}
for line in raw.splitlines():
if "=" not in line:
continue
key, value = line.split("=", 1)
data[key.strip()] = value.strip()
return data
async def _unit_status(unit: str, props: list[str]) -> dict[str, str]:
args = ["systemctl", "show", unit] + [f"-p{prop}" for prop in props]
rc, out = await run_cmd(args, timeout=10)
if rc != 0:
return {"error": out.strip() or f"systemctl {unit} failed"}
return _parse_systemctl_kv(out)
def _sudo_cmd(cmd: list[str]) -> list[str]:
if os.geteuid() == 0:
return cmd
return ["sudo", "-E"] + cmd
def _format_backup_result(rc: int, out: str) -> str:
log_path = "/var/log/backup-auto.log"
header = "✅ Backup finished" if rc == 0 else "❌ Backup failed"
lines = out.strip().splitlines()
body = "\n".join(lines[:20])
if len(lines) > 20:
body += f"\n… trimmed {len(lines) - 20} lines"
extra = ""
if rc != 0 and os.path.exists(log_path):
try:
tail = ""
with open(log_path, "r", encoding="utf-8", errors="replace") as f:
tail_lines = f.readlines()[-40:]
tail = "".join(tail_lines).strip()
if tail:
extra = "\n\nLog tail:\n" + tail
except Exception:
pass
base = f"{header} (rc={rc})\nlog: {log_path}"
if body:
base += "\n\n" + body
if extra:
base += extra
return base
def _tail(path: str, lines: int = 120) -> str:
if not os.path.exists(path):
return f"⚠️ Log not found: {path}"
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
data = f.readlines()[-lines:]
except Exception as e:
return f"⚠️ Failed to read log: {e}"
return "".join(data).strip() or "(empty)"
def _beautify_restic_forget(raw: str) -> str | None:
"""
Parse restic forget output tables into a compact bullet list.
"""
if "Reasons" not in raw or "Paths" not in raw:
return None
import re
lines = raw.splitlines()
headers = []
for idx, line in enumerate(lines):
if line.startswith("ID") and "Reasons" in line and "Paths" in line:
headers.append(idx)
if not headers:
return None
def _valid_id(val: str) -> bool:
return bool(re.fullmatch(r"[0-9a-f]{7,64}", val.strip()))
def parse_block(start_idx: int, end_idx: int) -> list[dict]:
header = lines[start_idx]
cols = ["ID", "Time", "Host", "Tags", "Reasons", "Paths", "Size"]
positions = []
for name in cols:
pos = header.find(name)
if pos == -1:
return []
positions.append(pos)
positions.append(len(header))
entries: list[dict] = []
current: dict | None = None
for line in lines[start_idx + 2 : end_idx]:
if not line.strip():
continue
segments = []
for i in range(len(cols)):
segments.append(line[positions[i] : positions[i + 1]].strip())
row = dict(zip(cols, segments))
if row["ID"] and _valid_id(row["ID"]):
current = {
"id": row["ID"],
"time": row["Time"],
"host": row["Host"],
"size": row["Size"],
"tags": row["Tags"],
"reasons": [],
"paths": [],
}
if row["Reasons"]:
current["reasons"].append(row["Reasons"])
if row["Paths"]:
current["paths"].append(row["Paths"])
entries.append(current)
elif current:
if row["Reasons"] and not row["Reasons"].startswith("-"):
current["reasons"].append(row["Reasons"])
if row["Paths"] and not row["Paths"].startswith("-"):
current["paths"].append(row["Paths"])
return entries
blocks = []
for i, start in enumerate(headers):
end = headers[i + 1] if i + 1 < len(headers) else len(lines)
entries = parse_block(start, end)
if not entries:
continue
label = "Plan"
prev_line = lines[start - 1].lower() if start - 1 >= 0 else ""
prev2 = lines[start - 2].lower() if start - 2 >= 0 else ""
if "keep" in prev_line:
label = prev_line.strip()
elif "keep" in prev2:
label = prev2.strip()
elif "snapshots" in prev_line:
label = prev_line.strip()
blocks.append((label, entries))
if not blocks:
return None
out_lines = []
for label, entries in blocks:
out_lines.append(f"📦 {label}")
for e in entries:
head = f"🧉 {e['id']} | {e['time']} | {e['host']} | {e['size'] or 'n/a'}"
out_lines.append(head)
if e["reasons"]:
out_lines.append(" 📌 " + "; ".join(e["reasons"]))
if e["paths"]:
for p in e["paths"]:
out_lines.append(f"{p}")
out_lines.append("")
return "\n".join(out_lines).rstrip()
def _load_json(raw: str, label: str) -> tuple[bool, object | None, str]:
if not raw or not raw.strip():
return False, None, f"? {label} returned empty output"
try:
return True, json.loads(raw), ""
except json.JSONDecodeError:
preview = raw.strip().splitlines()
head = preview[0] if preview else "invalid output"
return False, None, f"? {label} invalid JSON: {head}"
async def send_backup_jobs_status(msg: Message):
services = [
("backup-auto", "backup-auto.timer"),
("restic-check", "restic-check.timer"),
("weekly-report", "weekly-report.timer"),
]
service_props = ["ActiveState", "SubState", "Result", "ExecMainStatus", "ExecMainExitTimestamp"]
timer_props = ["LastTriggerUSecRealtime", "NextElapseUSecRealtime"]
lines = ["🕒 Backup jobs\n"]
for service, timer in services:
svc = await _unit_status(f"{service}.service", service_props)
tmr = await _unit_status(timer, timer_props)
if "error" in svc:
lines.append(f"🔴 {service}: {svc['error']}")
continue
active = svc.get("ActiveState", "n/a")
result = svc.get("Result", "n/a")
exit_status = svc.get("ExecMainStatus", "n/a")
last = svc.get("ExecMainExitTimestamp", "n/a")
next_run = tmr.get("NextElapseUSecRealtime", "n/a")
last_trigger = tmr.get("LastTriggerUSecRealtime", "n/a")
lines.append(
f"🧊 {service}: {active} ({result}, rc={exit_status})"
)
lines.append(f" Last run: {last}")
lines.append(f" Last trigger: {last_trigger}")
lines.append(f" Next: {next_run}")
lines.append("")
await msg.answer("\n".join(lines).rstrip(), reply_markup=backup_kb)
async def cmd_repo_stats(msg: Message):
await msg.answer("⏳ Loading repo stats…", reply_markup=backup_kb)
# --- restore-size stats ---
rc1, raw1 = await run_cmd_full(
["restic", "stats", "--json"],
use_restic_env=True,
timeout=30
)
if rc1 != 0:
await msg.answer(raw1, reply_markup=backup_kb)
return
ok, restore, err = _load_json(raw1, "restic stats")
if not ok:
await msg.answer(err, reply_markup=backup_kb)
return
# --- raw-data stats ---
rc2, raw2 = await run_cmd_full(
["restic", "stats", "--json", "--mode", "raw-data"],
use_restic_env=True,
timeout=30
)
if rc2 != 0:
await msg.answer(raw2, reply_markup=backup_kb)
return
ok, raw, err = _load_json(raw2, "restic stats raw-data")
if not ok:
await msg.answer(err, reply_markup=backup_kb)
return
# --- snapshots count ---
rc3, raw_snaps = await run_cmd_full(
["restic", "snapshots", "--json"],
use_restic_env=True,
timeout=20
)
if rc3 != 0:
snaps = "n/a"
else:
ok, snap_data, err = _load_json(raw_snaps, "restic snapshots")
if ok and isinstance(snap_data, list):
snaps = len(snap_data)
else:
snaps = "n/a"
msg_text = (
"📦 **Repository stats**\n\n"
f"🧉 Snapshots: {snaps}\n"
f"📁 Files: {restore.get('total_file_count', 'n/a')}\n"
f"💽 Logical size: {restore.get('total_size', 0) / (1024**3):.2f} GiB\n"
f"🧱 Stored data: {raw.get('total_pack_size', 0) / (1024**2):.2f} MiB\n"
)
await msg.answer(msg_text, reply_markup=backup_kb, parse_mode="Markdown")
async def cmd_backup_status(msg: Message):
await msg.answer("⏳ Loading snapshots…", reply_markup=backup_kb)
async def worker():
rc, raw = await run_cmd_full(
["restic", "snapshots", "--json"],
use_restic_env=True,
timeout=30
)
if rc != 0:
await msg.answer(raw, reply_markup=backup_kb)
return
ok, snaps, err = _load_json(raw, "restic snapshots")
if not ok or not isinstance(snaps, list):
await msg.answer(err, reply_markup=backup_kb)
return
if not snaps:
await msg.answer("📦 Snapshots: none", reply_markup=backup_kb)
return
snaps.sort(key=lambda s: s["time"], reverse=True)
# --- badge ---
last = snaps[0]
last_time = datetime.fromisoformat(
last["time"].replace("Z", "+00:00")
)
badge = backup_badge(last_time)
# --- buttons ---
rows = []
for s in snaps[:5]:
t = datetime.fromisoformat(
s["time"].replace("Z", "+00:00")
)
rows.append([
InlineKeyboardButton(
text=f"🧉 {s['short_id']} | {t:%Y-%m-%d %H:%M}",
callback_data=f"snap:{s['short_id']}"
)
])
kb = InlineKeyboardMarkup(inline_keyboard=rows)
await msg.answer(
f"📦 Snapshots ({len(snaps)})\n{badge}",
reply_markup=kb
)
await send_backup_jobs_status(msg)
asyncio.create_task(worker())
async def cmd_backup_now(msg: Message):
await schedule_backup(msg)
async def schedule_backup(msg: Message):
async def job():
if cfg.get("safety", {}).get("dry_run", False):
await msg.answer("🧪 Dry-run: backup skipped", reply_markup=backup_kb)
return
if not acquire_lock("backup"):
await msg.answer("⚠️ Backup уже выполняется", reply_markup=backup_kb)
return
await msg.answer("▶️ Backup запущен", reply_markup=backup_kb)
try:
rc, out = await run_cmd(
_sudo_cmd(["/usr/local/bin/backup.py", "restic-backup"]),
use_restic_env=True,
timeout=6 * 3600,
)
kb = backup_kb
if rc != 0:
kb = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text="🔁 Retry backup", callback_data="backup:retry")]
]
)
await msg.answer(_format_backup_result(rc, out), reply_markup=kb)
finally:
release_lock("backup")
pos = await enqueue("backup", job)
await msg.answer(f"🕓 Backup queued (#{pos})", reply_markup=backup_kb)
try:
from services.incidents import log_incident
log_incident(cfg, f"backup_queued by {msg.from_user.id}", category="backup")
except Exception:
pass
async def cmd_last_snapshot(msg: Message):
await msg.answer("⏳ Loading last snapshot…", reply_markup=backup_kb)
async def worker():
rc, raw = await run_cmd_full(
["restic", "snapshots", "--json"],
use_restic_env=True,
timeout=20
)
if rc != 0:
await msg.answer(raw, reply_markup=backup_kb)
return
ok, snaps, err = _load_json(raw, "restic snapshots")
if not ok or not isinstance(snaps, list):
await msg.answer(err, reply_markup=backup_kb)
return
if not snaps:
await msg.answer("📦 Snapshots: none", reply_markup=backup_kb)
return
snaps.sort(key=lambda s: s["time"], reverse=True)
s = snaps[0]
t = datetime.fromisoformat(s["time"].replace("Z", "+00:00"))
short_id = s["short_id"]
rc2, raw2 = await run_cmd_full(
["restic", "stats", short_id, "--json"],
use_restic_env=True,
timeout=20
)
if rc2 != 0:
await msg.answer(raw2, reply_markup=backup_kb)
return
ok, stats, err = _load_json(raw2, f"restic stats {short_id}")
if not ok or not isinstance(stats, dict):
await msg.answer(err, reply_markup=backup_kb)
return
msg_text = (
"📦 **Last snapshot**\n\n"
f"🕒 {t:%Y-%m-%d %H:%M}\n"
f"🧉 ID: {short_id}\n"
f"📁 Files: {stats.get('total_file_count', 'n/a')}\n"
f"💽 Size: {stats.get('total_size', 0) / (1024**3):.2f} GiB\n"
)
await msg.answer(msg_text, reply_markup=backup_kb, parse_mode="Markdown")
asyncio.create_task(worker())
@dp.message(F.text == "📦 Status")
async def bs(msg: Message):
if is_admin_msg(msg):
await cmd_backup_status(msg)
@dp.message(F.text == "📊 Repo stats")
async def rs(msg: Message):
if is_admin_msg(msg):
await cmd_repo_stats(msg)
@dp.message(F.text == "📦 Last snapshot")
async def ls(msg: Message):
if is_admin_msg(msg):
await cmd_last_snapshot(msg)
@dp.message(F.text == "🧾 Queue")
async def qb(msg: Message):
if is_admin_msg(msg):
kb = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text="Details", callback_data="queue:details")],
]
)
await msg.answer(format_status(), reply_markup=kb)
@dp.callback_query(F.data == "queue:details")
async def qd(cb: CallbackQuery):
if not is_admin_cb(cb):
return
await cb.answer()
await cb.message.answer(format_details(), reply_markup=backup_kb)
@dp.message(F.text == "▶️ Run backup")
async def br(msg: Message):
if is_admin_msg(msg):
await cmd_backup_now(msg)
@dp.message(F.text == "/backup_run")
async def br_cmd(msg: Message):
if is_admin_msg(msg):
await schedule_backup(msg)
@dp.message(F.text == "🧪 Restic check")
async def rc(msg: Message):
if not is_admin_msg(msg):
return
async def job():
await msg.answer("🧪 Restic check запущен", reply_markup=backup_kb)
rc2, out = await run_cmd(
_sudo_cmd(["/usr/local/bin/restic-check.sh"]),
use_restic_env=True,
timeout=6 * 3600,
)
kb = backup_kb
if rc2 != 0:
kb = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text="🔁 Retry restic check", callback_data="backup:retry_check")]
]
)
await msg.answer(("✅ OK\n" if rc2 == 0 else "❌ FAIL\n") + out, reply_markup=kb)
pos = await enqueue("restic-check", job)
await msg.answer(f"🕓 Restic check queued (#{pos})", reply_markup=backup_kb)
@dp.message(F.text == "📬 Weekly report")
async def wr(msg: Message):
if not is_admin_msg(msg):
return
async def job():
await msg.answer("📬 Weekly report запущен", reply_markup=backup_kb)
rc2, out = await run_cmd(
_sudo_cmd(["/usr/local/bin/weekly-report.sh"]),
use_restic_env=True,
timeout=3600,
)
await msg.answer(("✅ OK\n" if rc2 == 0 else "❌ FAIL\n") + out, reply_markup=backup_kb)
pos = await enqueue("weekly-report", job)
await msg.answer(f"🕓 Weekly report queued (#{pos})", reply_markup=backup_kb)
@dp.message(F.text == "🧯 Restore help")
async def rh(msg: Message):
if is_admin_msg(msg):
await msg.answer(restore_help(), reply_markup=backup_kb)
@dp.message(F.text == "📜 History")
@dp.message(F.text == "/backup_history")
async def backup_history(msg: Message):
if not is_admin_msg(msg):
return
log_path = "/var/log/backup-auto.log"
content = _tail(log_path, lines=160)
if content.startswith("⚠️"):
await msg.answer(content, reply_markup=backup_kb)
return
pretty = _beautify_restic_forget(content)
trimmed = False
max_len = 3500
if len(content) > max_len:
content = content[-max_len:]
trimmed = True
header = "📜 Backup history (tail)"
if trimmed:
header += " (trimmed)"
if pretty:
await msg.answer(f"{header}\n`{log_path}`\n\n{pretty}", reply_markup=backup_kb)
else:
await msg.answer(
f"{header}\n`{log_path}`\n```\n{content}\n```",
reply_markup=backup_kb,
parse_mode="Markdown",
)
@dp.callback_query(F.data == "backup:retry")
async def backup_retry(cb: CallbackQuery):
if not is_admin_cb(cb):
return
await cb.answer("Queuing backup…")
await schedule_backup(cb.message)
@dp.callback_query(F.data == "backup:retry_check")
async def backup_retry_check(cb: CallbackQuery):
if not is_admin_cb(cb):
return
await cb.answer("Queuing restic check…")
await rc(cb.message)