1052 lines
36 KiB
Python
1052 lines
36 KiB
Python
import asyncio
|
||
import os
|
||
from datetime import datetime, timezone, timedelta
|
||
from aiogram import F
|
||
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile
|
||
from app import dp, cfg
|
||
from auth import is_admin_msg
|
||
from keyboards import (
|
||
system_info_kb,
|
||
system_ops_kb,
|
||
system_logs_audit_kb,
|
||
system_logs_security_kb,
|
||
system_logs_integrations_kb,
|
||
system_logs_kb,
|
||
openwrt_kb,
|
||
docker_kb,
|
||
)
|
||
from system_checks import security, disks, hardware, list_disks, smart_last_test
|
||
from services.http_checks import get_url_checks, check_url
|
||
from services.queue import enqueue, get_history_raw, get_stats
|
||
from services.updates import list_updates, apply_updates
|
||
from services.runner import run_cmd, run_cmd_full
|
||
from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host
|
||
from services.gitea import get_gitea_health
|
||
from services.openwrt import get_openwrt_status, fetch_openwrt_leases
|
||
from services.system import worst_disk_usage
|
||
import state
|
||
from state import UPDATES_CACHE, REBOOT_PENDING
|
||
from services.metrics import summarize
|
||
from services.audit import read_audit_tail
|
||
from services.incidents import read_recent, incidents_path, read_raw, infer_category
|
||
from services.external_checks import format_report
|
||
from services.disk_report import build_disk_report
|
||
from services import runtime_state
|
||
import io
|
||
import json
|
||
import csv
|
||
import zipfile
|
||
|
||
|
||
@dp.message(F.text == "💽 Disks")
|
||
async def sd(msg: Message):
|
||
if is_admin_msg(msg):
|
||
await msg.answer(disks(), reply_markup=system_info_kb)
|
||
|
||
|
||
@dp.message(F.text == "🔐 Security")
|
||
async def sec(msg: Message):
|
||
if is_admin_msg(msg):
|
||
await msg.answer(security(), reply_markup=system_info_kb)
|
||
|
||
|
||
@dp.message(F.text == "🌐 URLs")
|
||
async def urls(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
|
||
checks = list(get_url_checks(cfg))
|
||
if not checks:
|
||
await msg.answer("⚠️ Нет URL для проверки", reply_markup=system_logs_security_kb)
|
||
return
|
||
|
||
await msg.answer("⏳ Проверяю URL…", reply_markup=system_logs_security_kb)
|
||
|
||
async def worker():
|
||
tasks = [asyncio.to_thread(check_url, url) for _, url in checks]
|
||
results = await asyncio.gather(*tasks)
|
||
|
||
lines = ["🌐 URLs\n"]
|
||
for (alias, _url), (ok, status, ms, err) in zip(checks, results):
|
||
if ok:
|
||
lines.append(f"🟢 {alias}: {status} ({ms}ms)")
|
||
elif status is not None:
|
||
lines.append(f"🔴 {alias}: {status} ({ms}ms)")
|
||
else:
|
||
reason = err or "error"
|
||
lines.append(f"🔴 {alias}: {reason} ({ms}ms)")
|
||
|
||
await msg.answer("\n".join(lines), reply_markup=system_logs_security_kb)
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
@dp.message(F.text == "🔑 SSH log")
|
||
async def ssh_log(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
|
||
await msg.answer("⏳ Loading SSH logins…", reply_markup=system_logs_security_kb)
|
||
|
||
async def worker():
|
||
raw_lines: list[str] = []
|
||
rc, out = await run_cmd(
|
||
[
|
||
"journalctl",
|
||
"-t",
|
||
"sshd",
|
||
"-t",
|
||
"sshd-session",
|
||
"--since",
|
||
"00:00",
|
||
"--no-pager",
|
||
],
|
||
timeout=20,
|
||
)
|
||
if rc == 0 and out.strip():
|
||
for line in out.splitlines():
|
||
if "Accepted" in line:
|
||
raw_lines.append(line)
|
||
|
||
if not raw_lines:
|
||
rc2, out2 = await run_cmd(["tail", "-n", "200", "/var/log/auth.log"], timeout=10)
|
||
if rc2 == 0 and out2.strip():
|
||
for line in out2.splitlines():
|
||
if "Accepted" in line:
|
||
raw_lines.append(line)
|
||
|
||
if not raw_lines:
|
||
await msg.answer("🔑 SSH log\n\n(no logins today)", reply_markup=system_logs_security_kb)
|
||
return
|
||
|
||
recent = raw_lines[-30:]
|
||
pretty = [_format_ssh_line(line) for line in recent]
|
||
text = "🔑 SSH logins (today)\n```\n" + "\n".join(pretty) + "\n```"
|
||
await msg.answer(text, reply_markup=system_logs_security_kb, parse_mode="Markdown")
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
def _format_ssh_line(line: str) -> str:
|
||
parts = line.split()
|
||
if len(parts) < 10:
|
||
return line
|
||
month, day, time_str = parts[0], parts[1], parts[2]
|
||
user = "?"
|
||
ip = "?"
|
||
key_type = "?"
|
||
if "for" in parts:
|
||
i = parts.index("for")
|
||
if i + 1 < len(parts):
|
||
user = parts[i + 1]
|
||
if "from" in parts:
|
||
i = parts.index("from")
|
||
if i + 1 < len(parts):
|
||
ip = parts[i + 1]
|
||
if "ssh2:" in parts:
|
||
i = parts.index("ssh2:")
|
||
if i + 1 < len(parts):
|
||
key_type = parts[i + 1]
|
||
return f"{month} {day} {time_str} {user} @ {ip} ({key_type})"
|
||
|
||
|
||
@dp.message(F.text == "📈 Metrics")
|
||
async def metrics(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
if state.METRICS_STORE is None:
|
||
await msg.answer("⚠️ Metrics not initialized", reply_markup=system_info_kb)
|
||
return
|
||
await msg.answer(summarize(state.METRICS_STORE, minutes=15), reply_markup=system_info_kb)
|
||
|
||
|
||
@dp.message(F.text == "🧪 SMART test")
|
||
async def smart_test(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
|
||
disks_list = list_disks()
|
||
if not disks_list:
|
||
await msg.answer("💽 Disks\n\n❌ No disks found", reply_markup=system_info_kb)
|
||
return
|
||
|
||
await msg.answer("🧪 Starting SMART short tests…", reply_markup=system_info_kb)
|
||
|
||
async def worker():
|
||
lines = ["🧪 SMART short tests\n"]
|
||
for dev in disks_list:
|
||
rc, out = await run_cmd(["sudo", "smartctl", "-t", "short", dev], timeout=20)
|
||
if rc != 0:
|
||
lines.append(f"🔴 {dev}: {out.strip() or 'error'}")
|
||
continue
|
||
summary = "started"
|
||
for line in out.splitlines():
|
||
if "Please wait" in line or "will complete" in line:
|
||
summary = line.strip()
|
||
break
|
||
lines.append(f"🟢 {dev}: {summary}")
|
||
await msg.answer("\n".join(lines), reply_markup=system_info_kb)
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
@dp.message(F.text == "🧪 SMART status")
|
||
async def smart_status(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
disks_list = list_disks()
|
||
if not disks_list:
|
||
await msg.answer("💽 Disks\n\n❌ No disks found", reply_markup=system_info_kb)
|
||
return
|
||
|
||
lines = ["🧪 SMART last tests\n"]
|
||
for dev in disks_list:
|
||
last = smart_last_test(dev)
|
||
lines.append(f"{dev}: {last}")
|
||
await msg.answer("\n".join(lines), reply_markup=system_info_kb)
|
||
|
||
|
||
@dp.message(F.text.in_({"/openwrt", "📡 Full status"}))
|
||
async def openwrt_status(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
|
||
await msg.answer("⏳ Checking OpenWrt…", reply_markup=openwrt_kb)
|
||
|
||
async def worker():
|
||
try:
|
||
text = await get_openwrt_status(cfg)
|
||
except Exception as e:
|
||
text = f"⚠️ OpenWrt error: {e}"
|
||
await msg.answer(text, reply_markup=openwrt_kb)
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
@dp.message(F.text == "/openwrt")
|
||
async def openwrt_cmd(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
await openwrt_status(msg)
|
||
|
||
|
||
@dp.message(F.text == "/openwrt_wan")
|
||
async def openwrt_wan(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
await msg.answer("⏳ Checking OpenWrt WAN…", reply_markup=openwrt_kb)
|
||
|
||
async def worker():
|
||
try:
|
||
text = await get_openwrt_status(cfg, mode="wan")
|
||
except Exception as e:
|
||
text = f"⚠️ OpenWrt error: {e}"
|
||
await msg.answer(text, reply_markup=openwrt_kb)
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
@dp.message(F.text == "📡 OpenWrt")
|
||
async def openwrt_menu(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
await msg.answer("📡 OpenWrt menu", reply_markup=openwrt_kb)
|
||
|
||
|
||
@dp.message(F.text.in_({"/openwrt_clients", "📶 Wi-Fi clients"}))
|
||
async def openwrt_clients(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
await msg.answer("⏳ Checking OpenWrt clients…", reply_markup=openwrt_kb)
|
||
|
||
async def worker():
|
||
try:
|
||
text = await get_openwrt_status(cfg, mode="clients")
|
||
except Exception as e:
|
||
text = f"⚠️ OpenWrt error: {e}"
|
||
await msg.answer(text, reply_markup=openwrt_kb)
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
@dp.message(F.text.in_({"/openwrt_leases", "🧾 Leases"}))
|
||
async def openwrt_leases(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
await msg.answer("⏳ Checking OpenWrt leases…", reply_markup=openwrt_kb)
|
||
|
||
async def worker():
|
||
try:
|
||
text = await get_openwrt_status(cfg, mode="leases")
|
||
except Exception as e:
|
||
text = f"⚠️ OpenWrt error: {e}"
|
||
await msg.answer(text, reply_markup=openwrt_kb)
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
@dp.message(F.text == "/openwrt_fast")
|
||
@dp.message(F.text == "🌐 WAN fast")
|
||
async def openwrt_fast(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
await msg.answer("⏳ OpenWrt fast…", reply_markup=openwrt_kb)
|
||
|
||
async def worker():
|
||
try:
|
||
text = await get_openwrt_status(cfg, mode="wan")
|
||
except Exception as e:
|
||
text = f"⚠️ OpenWrt error: {e}"
|
||
await msg.answer(text, reply_markup=openwrt_kb)
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
@dp.message(F.text == "🧾 Audit")
|
||
async def audit_log(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
text = read_audit_tail(cfg, limit=200)
|
||
if text.startswith("⚠️") or text.startswith("ℹ️"):
|
||
await msg.answer(text, reply_markup=system_logs_audit_kb)
|
||
else:
|
||
await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown")
|
||
|
||
|
||
@dp.message(F.text == "🌍 External")
|
||
async def external_checks(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
await msg.answer(format_report(cfg), reply_markup=system_logs_security_kb)
|
||
|
||
|
||
@dp.message(F.text == "📣 Incidents")
|
||
async def incidents(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
path = incidents_path(cfg)
|
||
if not os.path.exists(path):
|
||
await msg.answer("⚠️ Incidents log not found", reply_markup=system_logs_audit_kb)
|
||
return
|
||
last_24h = read_recent(cfg, hours=24, limit=500)
|
||
last_7d = read_recent(cfg, hours=24 * 7, limit=1000)
|
||
recent = last_24h[-30:] if last_24h else ["(no incidents)"]
|
||
|
||
body = "\n".join(recent)
|
||
text = (
|
||
"📣 Incidents\n\n"
|
||
f"Last 24h: {len(last_24h)}\n"
|
||
f"Last 7d: {len(last_7d)}\n\n"
|
||
"Recent (24h):\n"
|
||
f"```\n{body}\n```"
|
||
)
|
||
await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown")
|
||
|
||
|
||
@dp.message(F.text == "🧾 Incidents")
|
||
async def incidents_entry(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
await msg.answer(
|
||
"📣 Incidents menu:\n"
|
||
"- Summary\n- Diff\n- Heatmap\n- Export/All\n- Alerts log",
|
||
reply_markup=system_logs_audit_kb,
|
||
)
|
||
|
||
|
||
@dp.message(F.text.in_({"/incidents_summary", "📣 Summary"}))
|
||
async def incidents_summary(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
last_24h = read_raw(cfg, hours=24, limit=2000)
|
||
last_7d = read_raw(cfg, hours=24 * 7, limit=4000)
|
||
|
||
def summarize(items):
|
||
total = len(items)
|
||
cats = {}
|
||
suppressed = {}
|
||
last_seen = {}
|
||
for dt, msg in items:
|
||
cat = infer_category(msg) or "n/a"
|
||
cats[cat] = cats.get(cat, 0) + 1
|
||
last_seen[cat] = dt
|
||
if "[suppressed" in msg.lower():
|
||
suppressed[cat] = suppressed.get(cat, 0) + 1
|
||
def fmt_top(d):
|
||
return ", ".join(f"{k}:{v}" for k, v in sorted(d.items(), key=lambda x: x[1], reverse=True)[:5]) or "n/a"
|
||
top = fmt_top(cats)
|
||
top_supp = fmt_top(suppressed)
|
||
last_parts = []
|
||
for k, dt in sorted(last_seen.items(), key=lambda x: x[1], reverse=True)[:5]:
|
||
last_parts.append(f"{k}:{dt.astimezone().strftime('%Y-%m-%d %H:%M')}")
|
||
last_str = ", ".join(last_parts) or "n/a"
|
||
return total, top, top_supp, last_str
|
||
|
||
t24, top24, supp24, last24 = summarize(last_24h)
|
||
t7, top7, supp7, last7 = summarize(last_7d)
|
||
text = (
|
||
"📣 Incidents summary\n\n"
|
||
f"24h: {t24} (top: {top24}; suppressed: {supp24}; last: {last24})\n"
|
||
f"7d: {t7} (top: {top7}; suppressed: {supp7}; last: {last7})"
|
||
)
|
||
await msg.answer(text, reply_markup=system_logs_kb)
|
||
|
||
|
||
@dp.message(F.text.startswith("/incidents_diff"))
|
||
@dp.message(F.text == "🆕 Diff")
|
||
async def incidents_diff(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
parts = msg.text.split()
|
||
hours = 24
|
||
reset = False
|
||
if len(parts) >= 2:
|
||
if parts[1].lower() in {"reset", "clear"}:
|
||
reset = True
|
||
else:
|
||
try:
|
||
hours = max(1, int(parts[1]))
|
||
except ValueError:
|
||
hours = 24
|
||
if reset:
|
||
runtime_state.set_state("incidents_diff_last_ts", None)
|
||
await msg.answer("📣 Diff marker reset. Next run will show all within window.", reply_markup=system_logs_audit_kb)
|
||
return
|
||
last_iso = runtime_state.get("incidents_diff_last_ts")
|
||
last_dt = None
|
||
if isinstance(last_iso, str):
|
||
try:
|
||
last_dt = datetime.fromisoformat(last_iso)
|
||
except Exception:
|
||
last_dt = None
|
||
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
|
||
fresh: list[tuple[datetime, str]] = []
|
||
for dt, line in rows:
|
||
if last_dt and dt <= last_dt:
|
||
continue
|
||
fresh.append((dt, line))
|
||
if not fresh:
|
||
note = f"since {last_dt.astimezone().strftime('%Y-%m-%d %H:%M')}" if last_dt else "for the period"
|
||
await msg.answer(f"📣 No new incidents {note} (window {hours}h)", reply_markup=system_logs_audit_kb)
|
||
return
|
||
fresh.sort(key=lambda x: x[0])
|
||
body = "\n".join(f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" for dt, line in fresh[-200:])
|
||
await msg.answer(
|
||
f"📣 New incidents (since last mark, window {hours}h): {len(fresh)}\n```\n{body}\n```",
|
||
reply_markup=system_logs_audit_kb,
|
||
parse_mode="Markdown",
|
||
)
|
||
try:
|
||
runtime_state.set_state("incidents_diff_last_ts", fresh[-1][0].isoformat())
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
@dp.message(F.text.startswith("/alerts_heatmap"))
|
||
@dp.message(F.text == "🔥 Heatmap")
|
||
async def alerts_heatmap(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
parts = msg.text.split()
|
||
hours = 48
|
||
category = None
|
||
for part in parts[1:]:
|
||
if part.isdigit():
|
||
hours = max(1, int(part))
|
||
else:
|
||
category = part.lower()
|
||
rows = read_raw(cfg, hours=hours, limit=8000, include_old=True)
|
||
buckets: dict[datetime, int] = {}
|
||
for dt, line in rows:
|
||
cat = infer_category(line) or "n/a"
|
||
if category and cat != category:
|
||
continue
|
||
local = dt.astimezone()
|
||
hour = local.replace(minute=0, second=0, microsecond=0)
|
||
buckets[hour] = buckets.get(hour, 0) + 1
|
||
if not buckets:
|
||
await msg.answer(f"⚠️ No incidents for heatmap (hours={hours}, category={category or 'any'})")
|
||
return
|
||
start = datetime.now().astimezone() - timedelta(hours=hours - 1)
|
||
timeline = []
|
||
current = start.replace(minute=0, second=0, microsecond=0)
|
||
while current <= datetime.now().astimezone():
|
||
timeline.append((current, buckets.get(current, 0)))
|
||
current += timedelta(hours=1)
|
||
max_count = max(c for _, c in timeline) or 1
|
||
lines = [f"📊 Alerts heatmap (hours={hours}, category={category or 'any'})"]
|
||
for ts, cnt in timeline[-72:]:
|
||
bar_len = max(1, int(cnt / max_count * 12)) if cnt else 0
|
||
bar = "▇" * bar_len if cnt else ""
|
||
lines.append(f"{ts:%m-%d %H}: {cnt:>3} {bar}")
|
||
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
|
||
|
||
|
||
@dp.message(F.text == "/disk_snapshot")
|
||
async def disk_snapshot(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
usage, mount = worst_disk_usage()
|
||
mount = mount or "/"
|
||
try:
|
||
report = await build_disk_report(cfg, mount, usage or 0)
|
||
except Exception as e:
|
||
await msg.answer(f"⚠️ Disk snapshot error: {e}")
|
||
return
|
||
await msg.answer(f"💽 Disk snapshot ({mount})\n\n{report}", reply_markup=system_info_kb)
|
||
|
||
|
||
@dp.message(F.text.startswith("/alerts_log"))
|
||
async def alerts_log(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
parts = msg.text.split()
|
||
hours = 24
|
||
if len(parts) >= 2:
|
||
try:
|
||
hours = max(1, int(parts[1]))
|
||
except ValueError:
|
||
hours = 24
|
||
rows = read_raw(cfg, hours=hours, limit=2000)
|
||
suppressed = [(dt, m) for dt, m in rows if "[suppressed" in m.lower()]
|
||
sent = [(dt, m) for dt, m in rows if "[suppressed" not in m.lower()]
|
||
lines = [f"📣 Alerts log ({hours}h)"]
|
||
lines.append(f"Sent: {len(sent)}, Suppressed: {len(suppressed)}")
|
||
if suppressed:
|
||
lines.append("\nSuppressed:")
|
||
for dt, m in suppressed[-20:]:
|
||
lines.append(f"{dt:%m-%d %H:%M} {m}")
|
||
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
|
||
|
||
|
||
@dp.message(F.text.startswith("/incidents_export"))
|
||
@dp.message(F.text == "📤 Export")
|
||
async def incidents_export(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
parts = msg.text.split()
|
||
hours = 24
|
||
fmt = "csv"
|
||
if len(parts) >= 2:
|
||
try:
|
||
hours = max(1, int(parts[1]))
|
||
except ValueError:
|
||
hours = 24
|
||
if len(parts) >= 3:
|
||
fmt = parts[2].lower()
|
||
rows = read_raw(cfg, hours=hours, limit=20000, include_old=False)
|
||
data = []
|
||
for dt, msg_line in rows:
|
||
data.append({
|
||
"timestamp": dt.astimezone().isoformat(),
|
||
"category": infer_category(msg_line) or "n/a",
|
||
"message": msg_line,
|
||
})
|
||
if fmt == "json":
|
||
payload = json.dumps(data, ensure_ascii=False, indent=2)
|
||
file_bytes = payload.encode("utf-8")
|
||
fname = f"incidents_{hours}h.json"
|
||
else:
|
||
sio = io.StringIO()
|
||
writer = csv.DictWriter(sio, fieldnames=["timestamp", "category", "message"], delimiter=";")
|
||
writer.writeheader()
|
||
for row in data:
|
||
writer.writerow(row)
|
||
file_bytes = sio.getvalue().encode("utf-8")
|
||
fname = f"incidents_{hours}h.csv"
|
||
summary = f"📤 Incidents export ({hours}h): {len(data)} rows, format {fmt}"
|
||
await msg.answer(summary)
|
||
await msg.answer_document(document=BufferedInputFile(file_bytes, filename=fname))
|
||
|
||
|
||
@dp.message(F.text.in_({"/backup_sla", "📉 Backup SLA"}))
|
||
async def backup_sla(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
|
||
if rc != 0:
|
||
await msg.answer(f"⚠️ Restic error: {out.strip() or rc}", reply_markup=backup_kb)
|
||
return
|
||
try:
|
||
snaps = json.loads(out)
|
||
except Exception as e:
|
||
await msg.answer(f"⚠️ Invalid restic JSON: {e}", reply_markup=backup_kb)
|
||
return
|
||
if not isinstance(snaps, list) or not snaps:
|
||
await msg.answer("⚠️ No snapshots found", reply_markup=backup_kb)
|
||
return
|
||
snaps.sort(key=lambda s: s.get("time", ""), reverse=True)
|
||
last_time_raw = snaps[0].get("time")
|
||
if last_time_raw:
|
||
last_dt = datetime.fromisoformat(last_time_raw.replace("Z", "+00:00"))
|
||
age_h = (datetime.now(tz=last_dt.tzinfo) - last_dt).total_seconds() / 3600
|
||
else:
|
||
last_dt = None
|
||
age_h = None
|
||
sla = float(cfg.get("backup", {}).get("sla_hours", 26))
|
||
if age_h is None:
|
||
status = "🟡"
|
||
elif age_h <= sla:
|
||
status = "🟢"
|
||
elif age_h <= sla * 1.5:
|
||
status = "🟡"
|
||
else:
|
||
status = "🔴"
|
||
last_str = last_dt.astimezone().strftime("%Y-%m-%d %H:%M") if last_dt else "n/a"
|
||
age_str = f"{age_h:.1f}h" if age_h is not None else "n/a"
|
||
await msg.answer(
|
||
f"{status} Backup SLA\n"
|
||
f"Snapshots: {len(snaps)}\n"
|
||
f"Last: {last_str} (age {age_str})\n"
|
||
f"SLA: {sla}h",
|
||
reply_markup=backup_kb,
|
||
)
|
||
|
||
|
||
@dp.message(F.text.startswith("/docker_restarts"))
|
||
@dp.message(F.text == "♻️ Restarts")
|
||
async def docker_restarts(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
parts = msg.text.split()
|
||
hours = 168
|
||
if len(parts) >= 2:
|
||
try:
|
||
hours = max(1, int(parts[1]))
|
||
except ValueError:
|
||
hours = 168
|
||
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
|
||
restarts = [(dt, line) for dt, line in rows if "docker_restart" in line]
|
||
if not restarts:
|
||
await msg.answer(f"🐳 No docker restarts in last {hours}h", reply_markup=docker_kb)
|
||
return
|
||
counts: dict[str, int] = {}
|
||
for _dt, line in restarts:
|
||
parts_line = line.split()
|
||
if len(parts_line) >= 2:
|
||
alias = parts_line[-1]
|
||
else:
|
||
alias = "unknown"
|
||
counts[alias] = counts.get(alias, 0) + 1
|
||
top = ", ".join(f"{k}:{v}" for k, v in sorted(counts.items(), key=lambda x: x[1], reverse=True))
|
||
body = "\n".join(f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" for dt, line in restarts[-50:])
|
||
await msg.answer(
|
||
f"🐳 Docker restarts ({hours}h): {len(restarts)} ({top})\n```\n{body}\n```",
|
||
reply_markup=docker_kb,
|
||
parse_mode="Markdown",
|
||
)
|
||
|
||
|
||
@dp.message(F.text.startswith("/openwrt_leases_diff"))
|
||
@dp.message(F.text == "🔀 Leases diff")
|
||
async def openwrt_leases_diff(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
await msg.answer("⏳ OpenWrt leases diff…", reply_markup=openwrt_kb)
|
||
|
||
async def worker():
|
||
try:
|
||
leases_now = await fetch_openwrt_leases(cfg)
|
||
except Exception as e:
|
||
await msg.answer(f"⚠️ OpenWrt error: {e}", reply_markup=openwrt_kb)
|
||
return
|
||
prev = runtime_state.get("openwrt_leases_prev", [])
|
||
if not prev:
|
||
runtime_state.set_state("openwrt_leases_prev", leases_now)
|
||
await msg.answer(f"Baseline saved ({len(leases_now)} leases)", reply_markup=openwrt_kb)
|
||
return
|
||
prev_set = set(prev)
|
||
now_set = set(leases_now)
|
||
added = sorted(list(now_set - prev_set))
|
||
removed = sorted(list(prev_set - now_set))
|
||
lines = [f"🧾 OpenWrt leases diff ({len(added)} added, {len(removed)} removed)"]
|
||
if added:
|
||
lines.append("➕ Added:")
|
||
lines.extend([f" - {x}" for x in added[:50]])
|
||
if removed:
|
||
lines.append("➖ Removed:")
|
||
lines.extend([f" - {x}" for x in removed[:50]])
|
||
if len(added) > 50 or len(removed) > 50:
|
||
lines.append("… trimmed")
|
||
runtime_state.set_state("openwrt_leases_prev", leases_now)
|
||
await msg.answer("\n".join(lines), reply_markup=openwrt_kb)
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
@dp.message(F.text.in_({"/queue_sla", "📊 Queue SLA"}))
|
||
async def queue_sla(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
hist = get_history_raw()
|
||
if not hist:
|
||
await msg.answer("🧾 Queue SLA: history is empty", reply_markup=backup_kb)
|
||
return
|
||
waits = [item.get("wait_sec", 0) for item in hist]
|
||
runs = [item.get("runtime_sec", 0) for item in hist]
|
||
|
||
def p95(values: list[int]) -> float:
|
||
if not values:
|
||
return 0.0
|
||
vals = sorted(values)
|
||
k = int(0.95 * (len(vals) - 1))
|
||
return float(vals[k])
|
||
|
||
lines = [
|
||
"🧾 Queue SLA",
|
||
f"Samples: {len(hist)}",
|
||
f"Max wait: {max(waits)}s, p95 {p95(waits):.1f}s",
|
||
f"Max run: {max(runs)}s, p95 {p95(runs):.1f}s",
|
||
]
|
||
stats = get_stats()
|
||
if stats:
|
||
lines.append(
|
||
f"Avg wait: {stats.get('avg_wait_sec', 0):.1f}s, "
|
||
f"avg run: {stats.get('avg_runtime_sec', 0):.1f}s"
|
||
)
|
||
await msg.answer("\n".join(lines), reply_markup=backup_kb)
|
||
|
||
|
||
@dp.message(F.text == "/selftest_history")
|
||
async def selftest_history(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
hist = runtime_state.get("selftest_history", [])
|
||
if not hist:
|
||
await msg.answer("🧪 Self-test history is empty", reply_markup=system_logs_audit_kb)
|
||
return
|
||
lines = ["🧪 Self-test history"]
|
||
for entry in hist[:15]:
|
||
ts = entry.get("ts", "")[:16]
|
||
ok = entry.get("ok", False)
|
||
emoji = "🟢" if ok else "🔴"
|
||
summary = entry.get("summary", "")
|
||
lines.append(f"{emoji} {ts} {summary}")
|
||
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
|
||
|
||
|
||
@dp.message(F.text.startswith("/export_all"))
|
||
@dp.message(F.text == "📦 Export all")
|
||
async def export_all(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
hours = 168
|
||
parts = msg.text.split()
|
||
if len(parts) >= 2:
|
||
try:
|
||
hours = max(1, int(parts[1]))
|
||
except ValueError:
|
||
hours = 168
|
||
|
||
incidents_rows = read_raw(cfg, hours=hours, limit=20000, include_old=True)
|
||
inc_sio = io.StringIO()
|
||
inc_writer = csv.writer(inc_sio, delimiter=";")
|
||
inc_writer.writerow(["timestamp", "category", "message"])
|
||
for dt, line in incidents_rows:
|
||
inc_writer.writerow([dt.astimezone().isoformat(), infer_category(line) or "n/a", line])
|
||
|
||
queue_rows = get_history_raw()
|
||
q_sio = io.StringIO()
|
||
q_writer = csv.writer(q_sio, delimiter=";")
|
||
q_writer.writerow(["finished_at", "label", "status", "wait_sec", "runtime_sec"])
|
||
for item in queue_rows:
|
||
q_writer.writerow(
|
||
[
|
||
datetime.fromtimestamp(item.get("finished_at", 0)).isoformat()
|
||
if item.get("finished_at")
|
||
else "",
|
||
item.get("label", ""),
|
||
item.get("status", ""),
|
||
item.get("wait_sec", ""),
|
||
item.get("runtime_sec", ""),
|
||
]
|
||
)
|
||
|
||
st_hist = runtime_state.get("selftest_history", [])
|
||
st_sio = io.StringIO()
|
||
st_writer = csv.writer(st_sio, delimiter=";")
|
||
st_writer.writerow(["timestamp", "ok", "summary"])
|
||
for entry in st_hist:
|
||
st_writer.writerow([entry.get("ts", ""), entry.get("ok", False), entry.get("summary", "")])
|
||
|
||
mem_zip = io.BytesIO()
|
||
with zipfile.ZipFile(mem_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
||
zf.writestr(f"incidents_{hours}h.csv", inc_sio.getvalue())
|
||
zf.writestr("queue_history.csv", q_sio.getvalue())
|
||
zf.writestr("selftest_history.csv", st_sio.getvalue())
|
||
mem_zip.seek(0)
|
||
summary = (
|
||
f"📦 Export all\n"
|
||
f"Incidents: {len(incidents_rows)} rows ({hours}h)\n"
|
||
f"Queue history: {len(queue_rows)} rows\n"
|
||
f"Selftest: {len(st_hist)} rows"
|
||
)
|
||
await msg.answer(summary)
|
||
await msg.answer_document(document=BufferedInputFile(mem_zip.read(), filename="export_all.zip"))
|
||
|
||
|
||
@dp.message(F.text == "🔒 SSL")
|
||
async def ssl_certs(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
|
||
await msg.answer("⏳ Checking SSL certificates…", reply_markup=system_logs_security_kb)
|
||
|
||
async def worker():
|
||
try:
|
||
certs = await asyncio.to_thread(fetch_certificates, cfg)
|
||
text = format_certificates(certs)
|
||
except Exception as e:
|
||
text = f"⚠️ NPMplus error: {e}"
|
||
await msg.answer(text, reply_markup=system_logs_security_kb)
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
@dp.message(F.text == "🍵 Gitea")
|
||
async def gitea_health(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
|
||
await msg.answer("⏳ Checking Gitea health…", reply_markup=system_logs_integrations_kb)
|
||
|
||
async def worker():
|
||
try:
|
||
text = await asyncio.to_thread(get_gitea_health, cfg)
|
||
except Exception as e:
|
||
text = f"⚠️ Gitea error: {e}"
|
||
await msg.answer(text, reply_markup=system_logs_integrations_kb)
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
@dp.message(F.text == "🧩 NPMplus")
|
||
async def npmplus_hosts(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
|
||
await msg.answer("⏳ Loading NPMplus hosts…", reply_markup=system_logs_integrations_kb)
|
||
|
||
async def worker():
|
||
try:
|
||
hosts = await asyncio.to_thread(list_proxy_hosts, cfg)
|
||
except Exception as e:
|
||
await msg.answer(f"⚠️ NPMplus error: {e}", reply_markup=system_logs_integrations_kb)
|
||
return
|
||
|
||
if not hosts:
|
||
await msg.answer("🧩 NPMplus\n\n(no proxy hosts)", reply_markup=system_logs_integrations_kb)
|
||
return
|
||
|
||
lines = ["🧩 NPMplus proxy hosts\n"]
|
||
rows = []
|
||
for h in hosts[:10]:
|
||
hid = h.get("id")
|
||
domains = h.get("domain_names") or []
|
||
name = ", ".join(domains) if isinstance(domains, list) else str(domains)
|
||
forward = f"{h.get('forward_host')}:{h.get('forward_port')}"
|
||
enabled = h.get("enabled", True)
|
||
icon = "🟢" if enabled else "🔴"
|
||
lines.append(f"{icon} {name} → {forward}")
|
||
if hid is not None:
|
||
action = "disable" if enabled else "enable"
|
||
rows.append([
|
||
InlineKeyboardButton(
|
||
text=f"{('⛔' if enabled else '✅')} {name[:12]}",
|
||
callback_data=f"npmplus:{action}:{hid}"
|
||
)
|
||
])
|
||
|
||
kb = InlineKeyboardMarkup(inline_keyboard=rows)
|
||
await msg.answer("\n".join(lines), reply_markup=kb)
|
||
|
||
asyncio.create_task(worker())
|
||
|
||
|
||
@dp.message(F.text == "📦 Updates")
|
||
async def updates_list(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
|
||
async def job():
|
||
title, lines = await list_updates()
|
||
UPDATES_CACHE[msg.from_user.id] = {
|
||
"title": title,
|
||
"lines": lines,
|
||
"page_size": 20,
|
||
}
|
||
await send_updates_page(msg, msg.from_user.id, 0, edit=False)
|
||
|
||
pos = await enqueue("pkg-updates", job)
|
||
await msg.answer(f"🕓 Updates queued (#{pos})", reply_markup=system_ops_kb)
|
||
|
||
|
||
@dp.message(F.text == "⬆️ Upgrade")
|
||
async def updates_apply(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
|
||
kb = InlineKeyboardMarkup(
|
||
inline_keyboard=[[
|
||
InlineKeyboardButton(text="✅ Confirm", callback_data="upgrade:confirm"),
|
||
InlineKeyboardButton(text="✖ Cancel", callback_data="upgrade:cancel"),
|
||
]]
|
||
)
|
||
await msg.answer("⚠️ Confirm package upgrade?", reply_markup=kb)
|
||
|
||
|
||
@dp.message(F.text == "🔄 Reboot")
|
||
async def reboot_request(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
|
||
kb = InlineKeyboardMarkup(
|
||
inline_keyboard=[[
|
||
InlineKeyboardButton(text="✅ Confirm", callback_data="reboot:confirm"),
|
||
InlineKeyboardButton(text="✖ Cancel", callback_data="reboot:cancel"),
|
||
]]
|
||
)
|
||
await msg.answer("⚠️ Confirm reboot?", reply_markup=kb)
|
||
|
||
|
||
@dp.message(F.text == "🧱 Hardware")
|
||
async def hw(msg: Message):
|
||
if is_admin_msg(msg):
|
||
await msg.answer(hardware(), reply_markup=system_info_kb)
|
||
|
||
|
||
def _updates_kb(page: int, total_pages: int) -> InlineKeyboardMarkup:
|
||
buttons = []
|
||
if total_pages > 1:
|
||
row = []
|
||
if page > 0:
|
||
row.append(InlineKeyboardButton(text="⬅️ Prev", callback_data=f"updpage:{page-1}"))
|
||
if page < total_pages - 1:
|
||
row.append(InlineKeyboardButton(text="Next ➡️", callback_data=f"updpage:{page+1}"))
|
||
if row:
|
||
buttons.append(row)
|
||
return InlineKeyboardMarkup(inline_keyboard=buttons)
|
||
|
||
|
||
async def send_updates_page(msg: Message, user_id: int, page: int, edit: bool):
|
||
data = UPDATES_CACHE.get(user_id)
|
||
if not data:
|
||
await msg.answer("⚠️ Updates cache empty", reply_markup=system_ops_kb)
|
||
return
|
||
|
||
lines = data["lines"]
|
||
page_size = data["page_size"]
|
||
total_pages = max(1, (len(lines) + page_size - 1) // page_size)
|
||
page = max(0, min(page, total_pages - 1))
|
||
start = page * page_size
|
||
end = start + page_size
|
||
body = "\n".join(lines[start:end])
|
||
|
||
text = f"{data['title']} (page {page+1}/{total_pages})\n```\n{body}\n```"
|
||
if edit:
|
||
await msg.edit_text(text, reply_markup=_updates_kb(page, total_pages), parse_mode="Markdown")
|
||
else:
|
||
await msg.answer(text, reply_markup=_updates_kb(page, total_pages), parse_mode="Markdown")
|
||
|
||
|
||
@dp.callback_query(F.data.startswith("updpage:"))
|
||
async def updates_page(cb: CallbackQuery):
|
||
if cb.from_user.id not in UPDATES_CACHE:
|
||
await cb.answer("No cached updates")
|
||
return
|
||
try:
|
||
page = int(cb.data.split(":", 1)[1])
|
||
except ValueError:
|
||
await cb.answer("Bad page")
|
||
return
|
||
await cb.answer()
|
||
await send_updates_page(cb.message, cb.from_user.id, page, edit=True)
|
||
|
||
|
||
@dp.callback_query(F.data == "upgrade:confirm")
|
||
async def upgrade_confirm(cb: CallbackQuery):
|
||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
||
return
|
||
await cb.answer()
|
||
|
||
async def job():
|
||
if cfg.get("safety", {}).get("dry_run", False):
|
||
await cb.message.answer("🧪 Dry-run: upgrade skipped", reply_markup=system_ops_kb)
|
||
return
|
||
text = await apply_updates()
|
||
await cb.message.answer(text, reply_markup=system_ops_kb, parse_mode="Markdown")
|
||
|
||
pos = await enqueue("pkg-upgrade", job)
|
||
await cb.message.answer(f"🕓 Upgrade queued (#{pos})", reply_markup=system_ops_kb)
|
||
|
||
|
||
@dp.callback_query(F.data == "upgrade:cancel")
|
||
async def upgrade_cancel(cb: CallbackQuery):
|
||
await cb.answer("Cancelled")
|
||
await cb.message.delete()
|
||
|
||
|
||
@dp.callback_query(F.data == "reboot:confirm")
|
||
async def reboot_confirm(cb: CallbackQuery):
|
||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
||
return
|
||
await cb.answer()
|
||
REBOOT_PENDING[cb.from_user.id] = {}
|
||
await cb.message.answer("🔐 Send reboot password", reply_markup=system_ops_kb)
|
||
|
||
|
||
@dp.callback_query(F.data == "reboot:cancel")
|
||
async def reboot_cancel(cb: CallbackQuery):
|
||
await cb.answer("Cancelled")
|
||
await cb.message.delete()
|
||
|
||
|
||
@dp.callback_query(F.data.startswith("npmplus:"))
|
||
async def npmplus_toggle(cb: CallbackQuery):
|
||
if cb.from_user.id != cfg["telegram"]["admin_id"]:
|
||
return
|
||
parts = cb.data.split(":")
|
||
if len(parts) != 3:
|
||
await cb.answer("Bad request")
|
||
return
|
||
action, raw_id = parts[1], parts[2]
|
||
try:
|
||
host_id = int(raw_id)
|
||
except ValueError:
|
||
await cb.answer("Bad id")
|
||
return
|
||
|
||
enable = action == "enable"
|
||
await cb.answer("Working…")
|
||
ok, info = await asyncio.to_thread(set_proxy_host, cfg, host_id, enable)
|
||
if ok:
|
||
await cb.message.answer("✅ Updated", reply_markup=system_logs_integrations_kb)
|
||
else:
|
||
await cb.message.answer(f"❌ NPMplus error: {info}", reply_markup=system_logs_integrations_kb)
|
||
|
||
|
||
@dp.message(F.text, F.func(lambda msg: msg.from_user and msg.from_user.id in REBOOT_PENDING))
|
||
async def reboot_password(msg: Message):
|
||
if not is_admin_msg(msg):
|
||
return
|
||
REBOOT_PENDING.pop(msg.from_user.id, None)
|
||
|
||
expected = cfg.get("security", {}).get("reboot_password")
|
||
if not expected:
|
||
await msg.answer("⚠️ Reboot password not configured", reply_markup=system_ops_kb)
|
||
return
|
||
|
||
if (msg.text or "").strip() != expected:
|
||
await msg.answer("❌ Wrong password", reply_markup=system_ops_kb)
|
||
return
|
||
|
||
async def job():
|
||
if cfg.get("safety", {}).get("dry_run", False):
|
||
await msg.answer("🧪 Dry-run: reboot skipped", reply_markup=system_ops_kb)
|
||
return
|
||
await msg.answer("🔄 Rebooting…", reply_markup=system_ops_kb)
|
||
await run_cmd(["sudo", "reboot"], timeout=10)
|
||
|
||
pos = await enqueue("reboot", job)
|
||
await msg.answer(f"🕓 Reboot queued (#{pos})", reply_markup=system_ops_kb)
|