Files
tg-admin-bot/handlers/system.py

1096 lines
37 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
from datetime import datetime, timezone, timedelta
from aiogram import F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, InputFile, BufferedInputFile
from app import dp, cfg
from auth import is_admin_msg
from keyboards import (
system_info_kb,
system_ops_kb,
system_logs_audit_kb,
system_logs_security_kb,
system_logs_integrations_kb,
system_logs_kb,
openwrt_kb,
docker_kb,
)
from system_checks import security, disks, hardware, list_disks, smart_last_test
from services.http_checks import get_url_checks, check_url
from services.queue import enqueue, get_history_raw, get_stats
from services.updates import list_updates, apply_updates
from services.runner import run_cmd, run_cmd_full
from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host
from services.gitea import get_gitea_health
from services.openwrt import get_openwrt_status, fetch_openwrt_leases
from services.system import worst_disk_usage
import state
from state import UPDATES_CACHE, REBOOT_PENDING
from services.metrics import summarize
from services.audit import read_audit_tail
from services.incidents import read_recent, incidents_path, read_raw, infer_category
from services.external_checks import format_report
from services.disk_report import build_disk_report
from services import runtime_state
import io
import json
import csv
import zipfile
@dp.message(F.text == "💽 Disks")
async def sd(msg: Message):
if is_admin_msg(msg):
await msg.answer(disks(), reply_markup=system_info_kb)
@dp.message(F.text == "🔐 Security")
async def sec(msg: Message):
if is_admin_msg(msg):
await msg.answer(security(), reply_markup=system_info_kb)
@dp.message(F.text == "🌐 URLs")
async def urls(msg: Message):
if not is_admin_msg(msg):
return
checks = list(get_url_checks(cfg))
if not checks:
await msg.answer("⚠️ Нет URL для проверки", reply_markup=system_logs_security_kb)
return
await msg.answer("⏳ Проверяю URL…", reply_markup=system_logs_security_kb)
async def worker():
tasks = [asyncio.to_thread(check_url, url) for _, url in checks]
results = await asyncio.gather(*tasks)
lines = ["🌐 URLs\n"]
for (alias, _url), (ok, status, ms, err) in zip(checks, results):
if ok:
lines.append(f"🟢 {alias}: {status} ({ms}ms)")
elif status is not None:
lines.append(f"🔴 {alias}: {status} ({ms}ms)")
else:
reason = err or "error"
lines.append(f"🔴 {alias}: {reason} ({ms}ms)")
await msg.answer("\n".join(lines), reply_markup=system_logs_security_kb)
asyncio.create_task(worker())
@dp.message(F.text == "🔑 SSH log")
async def ssh_log(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Loading SSH logins…", reply_markup=system_logs_security_kb)
async def worker():
raw_lines: list[str] = []
rc, out = await run_cmd(
[
"journalctl",
"-t",
"sshd",
"-t",
"sshd-session",
"--since",
"00:00",
"--no-pager",
],
timeout=20,
)
if rc == 0 and out.strip():
for line in out.splitlines():
if "Accepted" in line:
raw_lines.append(line)
if not raw_lines:
rc2, out2 = await run_cmd(["tail", "-n", "200", "/var/log/auth.log"], timeout=10)
if rc2 == 0 and out2.strip():
for line in out2.splitlines():
if "Accepted" in line:
raw_lines.append(line)
if not raw_lines:
await msg.answer("🔑 SSH log\n\n(no logins today)", reply_markup=system_logs_security_kb)
return
recent = raw_lines[-30:]
pretty = [_format_ssh_line(line) for line in recent]
text = "🔑 SSH logins (today)\n```\n" + "\n".join(pretty) + "\n```"
await msg.answer(text, reply_markup=system_logs_security_kb, parse_mode="Markdown")
asyncio.create_task(worker())
def _format_ssh_line(line: str) -> str:
parts = line.split()
if len(parts) < 10:
return line
month, day, time_str = parts[0], parts[1], parts[2]
user = "?"
ip = "?"
key_type = "?"
if "for" in parts:
i = parts.index("for")
if i + 1 < len(parts):
user = parts[i + 1]
if "from" in parts:
i = parts.index("from")
if i + 1 < len(parts):
ip = parts[i + 1]
if "ssh2:" in parts:
i = parts.index("ssh2:")
if i + 1 < len(parts):
key_type = parts[i + 1]
return f"{month} {day} {time_str} {user} @ {ip} ({key_type})"
@dp.message(F.text == "📈 Metrics")
async def metrics(msg: Message):
if not is_admin_msg(msg):
return
if state.METRICS_STORE is None:
await msg.answer("⚠️ Metrics not initialized", reply_markup=system_info_kb)
return
await msg.answer(summarize(state.METRICS_STORE, minutes=15), reply_markup=system_info_kb)
@dp.message(F.text == "🧪 SMART test")
async def smart_test(msg: Message):
if not is_admin_msg(msg):
return
disks_list = list_disks()
if not disks_list:
await msg.answer("💽 Disks\n\n❌ No disks found", reply_markup=system_info_kb)
return
await msg.answer("🧪 Starting SMART short tests…", reply_markup=system_info_kb)
async def worker():
lines = ["🧪 SMART short tests\n"]
for dev in disks_list:
rc, out = await run_cmd(["sudo", "smartctl", "-t", "short", dev], timeout=20)
if rc != 0:
lines.append(f"🔴 {dev}: {out.strip() or 'error'}")
continue
summary = "started"
for line in out.splitlines():
if "Please wait" in line or "will complete" in line:
summary = line.strip()
break
lines.append(f"🟢 {dev}: {summary}")
await msg.answer("\n".join(lines), reply_markup=system_info_kb)
asyncio.create_task(worker())
@dp.message(F.text == "🧪 SMART status")
async def smart_status(msg: Message):
if not is_admin_msg(msg):
return
disks_list = list_disks()
if not disks_list:
await msg.answer("💽 Disks\n\n❌ No disks found", reply_markup=system_info_kb)
return
lines = ["🧪 SMART last tests\n"]
for dev in disks_list:
last = smart_last_test(dev)
lines.append(f"{dev}: {last}")
await msg.answer("\n".join(lines), reply_markup=system_info_kb)
@dp.message(F.text.in_({"/openwrt", "📡 Full status"}))
async def openwrt_status(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg)
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text == "/openwrt")
async def openwrt_cmd(msg: Message):
if not is_admin_msg(msg):
return
await openwrt_status(msg)
@dp.message(F.text == "/openwrt_wan")
async def openwrt_wan(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt WAN…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="wan")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text == "📡 OpenWrt")
async def openwrt_menu(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("📡 OpenWrt menu", reply_markup=openwrt_kb)
@dp.message(F.text.in_({"/openwrt_clients", "📶 Wi-Fi clients"}))
async def openwrt_clients(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt clients…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="clients")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text.in_({"/openwrt_leases", "🧾 Leases"}))
async def openwrt_leases(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt leases…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="leases")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text == "/openwrt_fast")
@dp.message(F.text == "🌐 WAN fast")
async def openwrt_fast(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ OpenWrt fast…", reply_markup=openwrt_kb)
async def worker():
try:
text = await get_openwrt_status(cfg, mode="wan")
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text == "🧾 Audit")
async def audit_log(msg: Message):
if not is_admin_msg(msg):
return
text = read_audit_tail(cfg, limit=200)
if text.startswith("⚠️") or text.startswith(""):
await msg.answer(text, reply_markup=system_logs_audit_kb)
else:
await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown")
@dp.message(F.text == "🌍 External")
async def external_checks(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer(format_report(cfg), reply_markup=system_logs_security_kb)
@dp.message(F.text == "📣 Incidents")
async def incidents(msg: Message):
if not is_admin_msg(msg):
return
path = incidents_path(cfg)
if not os.path.exists(path):
await msg.answer("⚠️ Incidents log not found", reply_markup=system_logs_audit_kb)
return
last_24h = read_recent(cfg, hours=24, limit=500)
last_7d = read_recent(cfg, hours=24 * 7, limit=1000)
recent = last_24h[-30:] if last_24h else ["(no incidents)"]
body = "\n".join(recent)
text = (
"📣 Incidents\n\n"
f"Last 24h: {len(last_24h)}\n"
f"Last 7d: {len(last_7d)}\n\n"
"Recent (24h):\n"
f"```\n{body}\n```"
)
await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown")
@dp.message(F.text == "🧾 Incidents")
async def incidents_entry(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer(
"📣 Incidents menu:\n"
"- Summary\n- Diff\n- Heatmap\n- Export/All\n- Alerts log",
reply_markup=system_logs_audit_kb,
)
@dp.message(F.text.in_({"/incidents_summary", "📣 Summary"}))
async def incidents_summary(msg: Message):
if not is_admin_msg(msg):
return
last_24h = read_raw(cfg, hours=24, limit=2000)
last_7d = read_raw(cfg, hours=24 * 7, limit=4000)
def summarize(items):
total = len(items)
cats = {}
suppressed = {}
last_seen = {}
for dt, msg in items:
cat = infer_category(msg) or "n/a"
cats[cat] = cats.get(cat, 0) + 1
last_seen[cat] = dt
if "[suppressed" in msg.lower():
suppressed[cat] = suppressed.get(cat, 0) + 1
def fmt_top(d):
return ", ".join(f"{k}:{v}" for k, v in sorted(d.items(), key=lambda x: x[1], reverse=True)[:5]) or "n/a"
top = fmt_top(cats)
top_supp = fmt_top(suppressed)
last_parts = []
for k, dt in sorted(last_seen.items(), key=lambda x: x[1], reverse=True)[:5]:
last_parts.append(f"{k}:{dt.astimezone().strftime('%Y-%m-%d %H:%M')}")
last_str = ", ".join(last_parts) or "n/a"
return total, top, top_supp, last_str
t24, top24, supp24, last24 = summarize(last_24h)
t7, top7, supp7, last7 = summarize(last_7d)
text = (
"📣 Incidents summary\n\n"
f"24h: {t24} (top: {top24}; suppressed: {supp24}; last: {last24})\n"
f"7d: {t7} (top: {top7}; suppressed: {supp7}; last: {last7})"
)
await msg.answer(text, reply_markup=system_logs_kb)
@dp.message(F.text.startswith("/incidents_diff"))
@dp.message(F.text == "🆕 Diff")
async def incidents_diff(msg: Message):
if not is_admin_msg(msg):
return
try:
parts = msg.text.split()
hours = 24
reset = False
if len(parts) >= 2:
if parts[1].lower() in {"reset", "clear"}:
reset = True
else:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
if reset:
runtime_state.set_state("incidents_diff_last_ts", None)
await msg.answer(
"📣 Diff marker reset. Next run will show all within window.",
reply_markup=system_logs_audit_kb,
)
return
last_iso = runtime_state.get("incidents_diff_last_ts")
last_dt = None
if isinstance(last_iso, str):
try:
last_dt = datetime.fromisoformat(last_iso)
except Exception:
last_dt = None
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
def collect(from_dt):
fresh: list[tuple[datetime, str]] = []
for dt, line in rows:
if from_dt and dt <= from_dt:
continue
fresh.append((dt, line))
return fresh
fresh = collect(last_dt)
# auto-reset if marker is ahead of all rows
if not fresh and last_dt and rows and last_dt >= rows[-1][0]:
last_dt = None
runtime_state.set_state("incidents_diff_last_ts", None)
fresh = collect(None)
def fmt_lines(items: list[tuple[datetime, str]], limit_chars: int = 3500) -> str:
buf = []
total_len = 0
for dt, line in items:
entry = f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}"
if total_len + len(entry) + 1 > limit_chars:
break
buf.append(entry)
total_len += len(entry) + 1
return "\n".join(buf)
if not fresh:
note = f"since {last_dt.astimezone().strftime('%Y-%m-%d %H:%M')}" if last_dt else "for the period"
if rows:
sample = rows[-50:][::-1] # newest first
body = fmt_lines(sample)
await msg.answer(
f"📣 No new incidents {note} (window {hours}h).\nRecent sample:\n```\n{body}\n```",
reply_markup=system_logs_audit_kb,
parse_mode="Markdown",
)
else:
await msg.answer(
f"📣 No new incidents {note} (window {hours}h)",
reply_markup=system_logs_audit_kb,
)
return
fresh.sort(key=lambda x: x[0], reverse=True)
body = fmt_lines(fresh)
await msg.answer(
f"📣 New incidents (since last mark, window {hours}h): {len(fresh)}\n```\n{body}\n```",
reply_markup=system_logs_audit_kb,
parse_mode="Markdown",
)
try:
runtime_state.set_state("incidents_diff_last_ts", fresh[0][0].isoformat())
except Exception:
pass
except Exception as e:
await msg.answer(f"⚠️ Diff error: {type(e).__name__}: {e}", reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/alerts_heatmap"))
@dp.message(F.text == "🔥 Heatmap")
async def alerts_heatmap(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 48
category = None
for part in parts[1:]:
if part.isdigit():
hours = max(1, int(part))
else:
category = part.lower()
rows = read_raw(cfg, hours=hours, limit=8000, include_old=True)
buckets: dict[datetime, int] = {}
for dt, line in rows:
cat = infer_category(line) or "n/a"
if category and cat != category:
continue
local = dt.astimezone()
hour = local.replace(minute=0, second=0, microsecond=0)
buckets[hour] = buckets.get(hour, 0) + 1
if not buckets:
await msg.answer(f"⚠️ No incidents for heatmap (hours={hours}, category={category or 'any'})")
return
start = datetime.now().astimezone() - timedelta(hours=hours - 1)
timeline = []
current = start.replace(minute=0, second=0, microsecond=0)
while current <= datetime.now().astimezone():
timeline.append((current, buckets.get(current, 0)))
current += timedelta(hours=1)
max_count = max(c for _, c in timeline) or 1
lines = [f"📊 Alerts heatmap (hours={hours}, category={category or 'any'})"]
for ts, cnt in timeline[-72:]:
bar_len = max(1, int(cnt / max_count * 12)) if cnt else 0
bar = "" * bar_len if cnt else ""
lines.append(f"{ts:%m-%d %H}: {cnt:>3} {bar}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text == "/disk_snapshot")
async def disk_snapshot(msg: Message):
if not is_admin_msg(msg):
return
usage, mount = worst_disk_usage()
mount = mount or "/"
try:
report = await build_disk_report(cfg, mount, usage or 0)
except Exception as e:
await msg.answer(f"⚠️ Disk snapshot error: {e}")
return
await msg.answer(f"💽 Disk snapshot ({mount})\n\n{report}", reply_markup=system_info_kb)
@dp.message(F.text.startswith("/alerts_log"))
async def alerts_log(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 24
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
rows = read_raw(cfg, hours=hours, limit=2000)
suppressed = [(dt, m) for dt, m in rows if "[suppressed" in m.lower()]
sent = [(dt, m) for dt, m in rows if "[suppressed" not in m.lower()]
lines = [f"📣 Alerts log ({hours}h)"]
lines.append(f"Sent: {len(sent)}, Suppressed: {len(suppressed)}")
if suppressed:
lines.append("\nSuppressed:")
for dt, m in suppressed[-20:]:
lines.append(f"{dt:%m-%d %H:%M} {m}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/incidents_export"))
@dp.message(F.text == "📤 Export")
async def incidents_export(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 24
fmt = "csv"
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 24
if len(parts) >= 3:
fmt = parts[2].lower()
rows = read_raw(cfg, hours=hours, limit=20000, include_old=False)
data = []
for dt, msg_line in rows:
data.append({
"timestamp": dt.astimezone().isoformat(),
"category": infer_category(msg_line) or "n/a",
"message": msg_line,
})
if fmt == "json":
payload = json.dumps(data, ensure_ascii=False, indent=2)
file_bytes = payload.encode("utf-8")
fname = f"incidents_{hours}h.json"
else:
sio = io.StringIO()
writer = csv.DictWriter(sio, fieldnames=["timestamp", "category", "message"], delimiter=";")
writer.writeheader()
for row in data:
writer.writerow(row)
file_bytes = sio.getvalue().encode("utf-8")
fname = f"incidents_{hours}h.csv"
summary = f"📤 Incidents export ({hours}h): {len(data)} rows, format {fmt}"
await msg.answer(summary)
await msg.answer_document(document=BufferedInputFile(file_bytes, filename=fname))
@dp.message(F.text.in_({"/backup_sla", "📉 Backup SLA", "Backup SLA"}))
@dp.message(F.text.contains("Backup SLA"))
async def backup_sla(msg: Message):
if not is_admin_msg(msg):
return
rc, out = await run_cmd_full(["restic", "snapshots", "--json"], use_restic_env=True, timeout=40)
if rc != 0:
await msg.answer(f"⚠️ Restic error: {out.strip() or rc}", reply_markup=backup_kb)
return
try:
snaps = json.loads(out)
except Exception as e:
await msg.answer(f"⚠️ Invalid restic JSON: {e}", reply_markup=backup_kb)
return
if not isinstance(snaps, list) or not snaps:
await msg.answer("⚠️ No snapshots found", reply_markup=backup_kb)
return
snaps.sort(key=lambda s: s.get("time", ""), reverse=True)
last_time_raw = snaps[0].get("time")
if last_time_raw:
last_dt = datetime.fromisoformat(last_time_raw.replace("Z", "+00:00"))
age_h = (datetime.now(tz=last_dt.tzinfo) - last_dt).total_seconds() / 3600
else:
last_dt = None
age_h = None
sla = float(cfg.get("backup", {}).get("sla_hours", 26))
if age_h is None:
status = "🟡"
elif age_h <= sla:
status = "🟢"
elif age_h <= sla * 1.5:
status = "🟡"
else:
status = "🔴"
last_str = last_dt.astimezone().strftime("%Y-%m-%d %H:%M") if last_dt else "n/a"
age_str = f"{age_h:.1f}h" if age_h is not None else "n/a"
await msg.answer(
f"{status} Backup SLA\n"
f"Snapshots: {len(snaps)}\n"
f"Last: {last_str} (age {age_str})\n"
f"SLA: {sla}h",
reply_markup=backup_kb,
)
@dp.message(F.text.startswith("/docker_restarts"))
@dp.message(F.text == "♻️ Restarts")
async def docker_restarts(msg: Message):
if not is_admin_msg(msg):
return
parts = msg.text.split()
hours = 168
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 168
rows = read_raw(cfg, hours=hours, limit=5000, include_old=True)
restarts = [(dt, line) for dt, line in rows if "docker_restart" in line]
if not restarts:
await msg.answer(f"🐳 No docker restarts in last {hours}h", reply_markup=docker_kb)
return
counts: dict[str, int] = {}
for _dt, line in restarts:
parts_line = line.split()
if len(parts_line) >= 2:
alias = parts_line[-1]
else:
alias = "unknown"
counts[alias] = counts.get(alias, 0) + 1
top = ", ".join(f"{k}:{v}" for k, v in sorted(counts.items(), key=lambda x: x[1], reverse=True))
body = "\n".join(f"{dt.astimezone().strftime('%m-%d %H:%M')} {line}" for dt, line in restarts[-50:])
await msg.answer(
f"🐳 Docker restarts ({hours}h): {len(restarts)} ({top})\n```\n{body}\n```",
reply_markup=docker_kb,
parse_mode="Markdown",
)
@dp.message(F.text.startswith("/openwrt_leases_diff"))
@dp.message(F.text == "🔀 Leases diff")
async def openwrt_leases_diff(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ OpenWrt leases diff…", reply_markup=openwrt_kb)
async def worker():
try:
leases_now = await fetch_openwrt_leases(cfg)
except Exception as e:
await msg.answer(f"⚠️ OpenWrt error: {e}", reply_markup=openwrt_kb)
return
prev = runtime_state.get("openwrt_leases_prev", [])
if not prev:
runtime_state.set_state("openwrt_leases_prev", leases_now)
await msg.answer(f"Baseline saved ({len(leases_now)} leases)", reply_markup=openwrt_kb)
return
prev_set = set(prev)
now_set = set(leases_now)
added = sorted(list(now_set - prev_set))
removed = sorted(list(prev_set - now_set))
lines = [f"🧾 OpenWrt leases diff ({len(added)} added, {len(removed)} removed)"]
if added:
lines.append(" Added:")
lines.extend([f" - {x}" for x in added[:50]])
if removed:
lines.append(" Removed:")
lines.extend([f" - {x}" for x in removed[:50]])
if len(added) > 50 or len(removed) > 50:
lines.append("… trimmed")
runtime_state.set_state("openwrt_leases_prev", leases_now)
await msg.answer("\n".join(lines), reply_markup=openwrt_kb)
asyncio.create_task(worker())
@dp.message(F.text.in_({"/queue_sla", "📊 Queue SLA", "Queue SLA"}))
@dp.message(F.text.contains("Queue SLA"))
async def queue_sla(msg: Message):
if not is_admin_msg(msg):
return
hist = get_history_raw()
if not hist:
await msg.answer("🧾 Queue SLA: history is empty", reply_markup=backup_kb)
return
waits = [item.get("wait_sec", 0) for item in hist]
runs = [item.get("runtime_sec", 0) for item in hist]
def p95(values: list[int]) -> float:
if not values:
return 0.0
vals = sorted(values)
k = int(0.95 * (len(vals) - 1))
return float(vals[k])
lines = [
"🧾 Queue SLA",
f"Samples: {len(hist)}",
f"Max wait: {max(waits)}s, p95 {p95(waits):.1f}s",
f"Max run: {max(runs)}s, p95 {p95(runs):.1f}s",
]
stats = get_stats()
if stats:
lines.append(
f"Avg wait: {stats.get('avg_wait_sec', 0):.1f}s, "
f"avg run: {stats.get('avg_runtime_sec', 0):.1f}s"
)
await msg.answer("\n".join(lines), reply_markup=backup_kb)
@dp.message(F.text == "/selftest_history")
async def selftest_history(msg: Message):
if not is_admin_msg(msg):
return
hist = runtime_state.get("selftest_history", [])
if not hist:
await msg.answer("🧪 Self-test history is empty", reply_markup=system_logs_audit_kb)
return
lines = ["🧪 Self-test history"]
for entry in hist[:15]:
ts = entry.get("ts", "")[:16]
ok = entry.get("ok", False)
emoji = "🟢" if ok else "🔴"
summary = entry.get("summary", "")
lines.append(f"{emoji} {ts} {summary}")
await msg.answer("\n".join(lines), reply_markup=system_logs_audit_kb)
@dp.message(F.text.startswith("/export_all"))
@dp.message(F.text == "📦 Export all")
async def export_all(msg: Message):
if not is_admin_msg(msg):
return
hours = 168
parts = msg.text.split()
if len(parts) >= 2:
try:
hours = max(1, int(parts[1]))
except ValueError:
hours = 168
incidents_rows = read_raw(cfg, hours=hours, limit=20000, include_old=True)
inc_sio = io.StringIO()
inc_writer = csv.writer(inc_sio, delimiter=";")
inc_writer.writerow(["timestamp", "category", "message"])
for dt, line in incidents_rows:
inc_writer.writerow([dt.astimezone().isoformat(), infer_category(line) or "n/a", line])
queue_rows = get_history_raw()
q_sio = io.StringIO()
q_writer = csv.writer(q_sio, delimiter=";")
q_writer.writerow(["finished_at", "label", "status", "wait_sec", "runtime_sec"])
for item in queue_rows:
q_writer.writerow(
[
datetime.fromtimestamp(item.get("finished_at", 0)).isoformat()
if item.get("finished_at")
else "",
item.get("label", ""),
item.get("status", ""),
item.get("wait_sec", ""),
item.get("runtime_sec", ""),
]
)
st_hist = runtime_state.get("selftest_history", [])
st_sio = io.StringIO()
st_writer = csv.writer(st_sio, delimiter=";")
st_writer.writerow(["timestamp", "ok", "summary"])
for entry in st_hist:
st_writer.writerow([entry.get("ts", ""), entry.get("ok", False), entry.get("summary", "")])
mem_zip = io.BytesIO()
with zipfile.ZipFile(mem_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(f"incidents_{hours}h.csv", inc_sio.getvalue())
zf.writestr("queue_history.csv", q_sio.getvalue())
zf.writestr("selftest_history.csv", st_sio.getvalue())
mem_zip.seek(0)
summary = (
f"📦 Export all\n"
f"Incidents: {len(incidents_rows)} rows ({hours}h)\n"
f"Queue history: {len(queue_rows)} rows\n"
f"Selftest: {len(st_hist)} rows"
)
await msg.answer(summary)
await msg.answer_document(document=BufferedInputFile(mem_zip.read(), filename="export_all.zip"))
@dp.message(F.text == "🔒 SSL")
async def ssl_certs(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking SSL certificates…", reply_markup=system_logs_security_kb)
async def worker():
try:
certs = await asyncio.to_thread(fetch_certificates, cfg)
text = format_certificates(certs)
except Exception as e:
text = f"⚠️ NPMplus error: {e}"
await msg.answer(text, reply_markup=system_logs_security_kb)
asyncio.create_task(worker())
@dp.message(F.text == "🍵 Gitea")
async def gitea_health(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking Gitea health…", reply_markup=system_logs_integrations_kb)
async def worker():
try:
text = await asyncio.to_thread(get_gitea_health, cfg)
except Exception as e:
text = f"⚠️ Gitea error: {e}"
await msg.answer(text, reply_markup=system_logs_integrations_kb)
asyncio.create_task(worker())
@dp.message(F.text == "🧩 NPMplus")
async def npmplus_hosts(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Loading NPMplus hosts…", reply_markup=system_logs_integrations_kb)
async def worker():
try:
hosts = await asyncio.to_thread(list_proxy_hosts, cfg)
except Exception as e:
await msg.answer(f"⚠️ NPMplus error: {e}", reply_markup=system_logs_integrations_kb)
return
if not hosts:
await msg.answer("🧩 NPMplus\n\n(no proxy hosts)", reply_markup=system_logs_integrations_kb)
return
lines = ["🧩 NPMplus proxy hosts\n"]
rows = []
for h in hosts[:10]:
hid = h.get("id")
domains = h.get("domain_names") or []
name = ", ".join(domains) if isinstance(domains, list) else str(domains)
forward = f"{h.get('forward_host')}:{h.get('forward_port')}"
enabled = h.get("enabled", True)
icon = "🟢" if enabled else "🔴"
lines.append(f"{icon} {name}{forward}")
if hid is not None:
action = "disable" if enabled else "enable"
rows.append([
InlineKeyboardButton(
text=f"{('' if enabled else '')} {name[:12]}",
callback_data=f"npmplus:{action}:{hid}"
)
])
kb = InlineKeyboardMarkup(inline_keyboard=rows)
await msg.answer("\n".join(lines), reply_markup=kb)
asyncio.create_task(worker())
@dp.message(F.text == "📦 Updates")
async def updates_list(msg: Message):
if not is_admin_msg(msg):
return
async def job():
title, lines = await list_updates()
UPDATES_CACHE[msg.from_user.id] = {
"title": title,
"lines": lines,
"page_size": 20,
}
await send_updates_page(msg, msg.from_user.id, 0, edit=False)
pos = await enqueue("pkg-updates", job)
await msg.answer(f"🕓 Updates queued (#{pos})", reply_markup=system_ops_kb)
@dp.message(F.text == "⬆️ Upgrade")
async def updates_apply(msg: Message):
if not is_admin_msg(msg):
return
kb = InlineKeyboardMarkup(
inline_keyboard=[[
InlineKeyboardButton(text="✅ Confirm", callback_data="upgrade:confirm"),
InlineKeyboardButton(text="✖ Cancel", callback_data="upgrade:cancel"),
]]
)
await msg.answer("⚠️ Confirm package upgrade?", reply_markup=kb)
@dp.message(F.text == "🔄 Reboot")
async def reboot_request(msg: Message):
if not is_admin_msg(msg):
return
kb = InlineKeyboardMarkup(
inline_keyboard=[[
InlineKeyboardButton(text="✅ Confirm", callback_data="reboot:confirm"),
InlineKeyboardButton(text="✖ Cancel", callback_data="reboot:cancel"),
]]
)
await msg.answer("⚠️ Confirm reboot?", reply_markup=kb)
@dp.message(F.text == "🧱 Hardware")
async def hw(msg: Message):
if is_admin_msg(msg):
await msg.answer(hardware(), reply_markup=system_info_kb)
def _updates_kb(page: int, total_pages: int) -> InlineKeyboardMarkup:
buttons = []
if total_pages > 1:
row = []
if page > 0:
row.append(InlineKeyboardButton(text="⬅️ Prev", callback_data=f"updpage:{page-1}"))
if page < total_pages - 1:
row.append(InlineKeyboardButton(text="Next ➡️", callback_data=f"updpage:{page+1}"))
if row:
buttons.append(row)
return InlineKeyboardMarkup(inline_keyboard=buttons)
async def send_updates_page(msg: Message, user_id: int, page: int, edit: bool):
data = UPDATES_CACHE.get(user_id)
if not data:
await msg.answer("⚠️ Updates cache empty", reply_markup=system_ops_kb)
return
lines = data["lines"]
page_size = data["page_size"]
total_pages = max(1, (len(lines) + page_size - 1) // page_size)
page = max(0, min(page, total_pages - 1))
start = page * page_size
end = start + page_size
body = "\n".join(lines[start:end])
text = f"{data['title']} (page {page+1}/{total_pages})\n```\n{body}\n```"
if edit:
await msg.edit_text(text, reply_markup=_updates_kb(page, total_pages), parse_mode="Markdown")
else:
await msg.answer(text, reply_markup=_updates_kb(page, total_pages), parse_mode="Markdown")
@dp.callback_query(F.data.startswith("updpage:"))
async def updates_page(cb: CallbackQuery):
if cb.from_user.id not in UPDATES_CACHE:
await cb.answer("No cached updates")
return
try:
page = int(cb.data.split(":", 1)[1])
except ValueError:
await cb.answer("Bad page")
return
await cb.answer()
await send_updates_page(cb.message, cb.from_user.id, page, edit=True)
@dp.callback_query(F.data == "upgrade:confirm")
async def upgrade_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
return
await cb.answer()
async def job():
if cfg.get("safety", {}).get("dry_run", False):
await cb.message.answer("🧪 Dry-run: upgrade skipped", reply_markup=system_ops_kb)
return
text = await apply_updates()
await cb.message.answer(text, reply_markup=system_ops_kb, parse_mode="Markdown")
pos = await enqueue("pkg-upgrade", job)
await cb.message.answer(f"🕓 Upgrade queued (#{pos})", reply_markup=system_ops_kb)
@dp.callback_query(F.data == "upgrade:cancel")
async def upgrade_cancel(cb: CallbackQuery):
await cb.answer("Cancelled")
await cb.message.delete()
@dp.callback_query(F.data == "reboot:confirm")
async def reboot_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
return
await cb.answer()
REBOOT_PENDING[cb.from_user.id] = {}
await cb.message.answer("🔐 Send reboot password", reply_markup=system_ops_kb)
@dp.callback_query(F.data == "reboot:cancel")
async def reboot_cancel(cb: CallbackQuery):
await cb.answer("Cancelled")
await cb.message.delete()
@dp.callback_query(F.data.startswith("npmplus:"))
async def npmplus_toggle(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
return
parts = cb.data.split(":")
if len(parts) != 3:
await cb.answer("Bad request")
return
action, raw_id = parts[1], parts[2]
try:
host_id = int(raw_id)
except ValueError:
await cb.answer("Bad id")
return
enable = action == "enable"
await cb.answer("Working…")
ok, info = await asyncio.to_thread(set_proxy_host, cfg, host_id, enable)
if ok:
await cb.message.answer("✅ Updated", reply_markup=system_logs_integrations_kb)
else:
await cb.message.answer(f"❌ NPMplus error: {info}", reply_markup=system_logs_integrations_kb)
@dp.message(F.text, F.func(lambda msg: msg.from_user and msg.from_user.id in REBOOT_PENDING))
async def reboot_password(msg: Message):
if not is_admin_msg(msg):
return
REBOOT_PENDING.pop(msg.from_user.id, None)
expected = cfg.get("security", {}).get("reboot_password")
if not expected:
await msg.answer("⚠️ Reboot password not configured", reply_markup=system_ops_kb)
return
if (msg.text or "").strip() != expected:
await msg.answer("❌ Wrong password", reply_markup=system_ops_kb)
return
async def job():
if cfg.get("safety", {}).get("dry_run", False):
await msg.answer("🧪 Dry-run: reboot skipped", reply_markup=system_ops_kb)
return
await msg.answer("🔄 Rebooting…", reply_markup=system_ops_kb)
await run_cmd(["sudo", "reboot"], timeout=10)
pos = await enqueue("reboot", job)
await msg.answer(f"🕓 Reboot queued (#{pos})", reply_markup=system_ops_kb)