Files
tg-admin-bot/handlers/system.py
2026-02-08 02:54:09 +03:00

514 lines
17 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
from aiogram import F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from app import dp, cfg
from auth import is_admin_msg
from keyboards import (
system_info_kb,
system_ops_kb,
system_logs_audit_kb,
system_logs_security_kb,
system_logs_integrations_kb,
)
from system_checks import security, disks, hardware, list_disks, smart_last_test
from services.http_checks import get_url_checks, check_url
from services.queue import enqueue
from services.updates import list_updates, apply_updates
from services.runner import run_cmd
from services.npmplus import fetch_certificates, format_certificates, list_proxy_hosts, set_proxy_host
from services.gitea import get_gitea_health
from services.openwrt import get_openwrt_status
import state
from state import UPDATES_CACHE, REBOOT_PENDING
from services.metrics import summarize
from services.audit import read_audit_tail
from services.incidents import read_recent, incidents_path
from services.external_checks import format_report
@dp.message(F.text == "💽 Disks")
async def sd(msg: Message):
if is_admin_msg(msg):
await msg.answer(disks(), reply_markup=system_info_kb)
@dp.message(F.text == "🔐 Security")
async def sec(msg: Message):
if is_admin_msg(msg):
await msg.answer(security(), reply_markup=system_info_kb)
@dp.message(F.text == "🌐 URLs")
async def urls(msg: Message):
if not is_admin_msg(msg):
return
checks = list(get_url_checks(cfg))
if not checks:
await msg.answer("⚠️ Нет URL для проверки", reply_markup=system_logs_security_kb)
return
await msg.answer("⏳ Проверяю URL…", reply_markup=system_logs_security_kb)
async def worker():
tasks = [asyncio.to_thread(check_url, url) for _, url in checks]
results = await asyncio.gather(*tasks)
lines = ["🌐 URLs\n"]
for (alias, _url), (ok, status, ms, err) in zip(checks, results):
if ok:
lines.append(f"🟢 {alias}: {status} ({ms}ms)")
elif status is not None:
lines.append(f"🔴 {alias}: {status} ({ms}ms)")
else:
reason = err or "error"
lines.append(f"🔴 {alias}: {reason} ({ms}ms)")
await msg.answer("\n".join(lines), reply_markup=system_logs_security_kb)
asyncio.create_task(worker())
@dp.message(F.text == "🔑 SSH log")
async def ssh_log(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Loading SSH logins…", reply_markup=system_logs_security_kb)
async def worker():
raw_lines: list[str] = []
rc, out = await run_cmd(
[
"journalctl",
"-t",
"sshd",
"-t",
"sshd-session",
"--since",
"00:00",
"--no-pager",
],
timeout=20,
)
if rc == 0 and out.strip():
for line in out.splitlines():
if "Accepted" in line:
raw_lines.append(line)
if not raw_lines:
rc2, out2 = await run_cmd(["tail", "-n", "200", "/var/log/auth.log"], timeout=10)
if rc2 == 0 and out2.strip():
for line in out2.splitlines():
if "Accepted" in line:
raw_lines.append(line)
if not raw_lines:
await msg.answer("🔑 SSH log\n\n(no logins today)", reply_markup=system_logs_security_kb)
return
recent = raw_lines[-30:]
pretty = [_format_ssh_line(line) for line in recent]
text = "🔑 SSH logins (today)\n```\n" + "\n".join(pretty) + "\n```"
await msg.answer(text, reply_markup=system_logs_security_kb, parse_mode="Markdown")
asyncio.create_task(worker())
def _format_ssh_line(line: str) -> str:
parts = line.split()
if len(parts) < 10:
return line
month, day, time_str = parts[0], parts[1], parts[2]
user = "?"
ip = "?"
key_type = "?"
if "for" in parts:
i = parts.index("for")
if i + 1 < len(parts):
user = parts[i + 1]
if "from" in parts:
i = parts.index("from")
if i + 1 < len(parts):
ip = parts[i + 1]
if "ssh2:" in parts:
i = parts.index("ssh2:")
if i + 1 < len(parts):
key_type = parts[i + 1]
return f"{month} {day} {time_str} {user} @ {ip} ({key_type})"
@dp.message(F.text == "📈 Metrics")
async def metrics(msg: Message):
if not is_admin_msg(msg):
return
if state.METRICS_STORE is None:
await msg.answer("⚠️ Metrics not initialized", reply_markup=system_info_kb)
return
await msg.answer(summarize(state.METRICS_STORE, minutes=15), reply_markup=system_info_kb)
@dp.message(F.text == "🧪 SMART test")
async def smart_test(msg: Message):
if not is_admin_msg(msg):
return
disks_list = list_disks()
if not disks_list:
await msg.answer("💽 Disks\n\n❌ No disks found", reply_markup=system_info_kb)
return
await msg.answer("🧪 Starting SMART short tests…", reply_markup=system_info_kb)
async def worker():
lines = ["🧪 SMART short tests\n"]
for dev in disks_list:
rc, out = await run_cmd(["sudo", "smartctl", "-t", "short", dev], timeout=20)
if rc != 0:
lines.append(f"🔴 {dev}: {out.strip() or 'error'}")
continue
summary = "started"
for line in out.splitlines():
if "Please wait" in line or "will complete" in line:
summary = line.strip()
break
lines.append(f"🟢 {dev}: {summary}")
await msg.answer("\n".join(lines), reply_markup=system_info_kb)
asyncio.create_task(worker())
@dp.message(F.text == "🧪 SMART status")
async def smart_status(msg: Message):
if not is_admin_msg(msg):
return
disks_list = list_disks()
if not disks_list:
await msg.answer("💽 Disks\n\n❌ No disks found", reply_markup=system_info_kb)
return
lines = ["🧪 SMART last tests\n"]
for dev in disks_list:
last = smart_last_test(dev)
lines.append(f"{dev}: {last}")
await msg.answer("\n".join(lines), reply_markup=system_info_kb)
@dp.message(F.text == "📡 OpenWrt")
async def openwrt_status(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking OpenWrt…", reply_markup=system_info_kb)
async def worker():
try:
text = await get_openwrt_status(cfg)
except Exception as e:
text = f"⚠️ OpenWrt error: {e}"
await msg.answer(text, reply_markup=system_info_kb)
asyncio.create_task(worker())
@dp.message(F.text == "🧾 Audit")
async def audit_log(msg: Message):
if not is_admin_msg(msg):
return
text = read_audit_tail(cfg, limit=200)
if text.startswith("⚠️") or text.startswith(""):
await msg.answer(text, reply_markup=system_logs_audit_kb)
else:
await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown")
@dp.message(F.text == "🌍 External")
async def external_checks(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer(format_report(cfg), reply_markup=system_logs_security_kb)
@dp.message(F.text == "📣 Incidents")
async def incidents(msg: Message):
if not is_admin_msg(msg):
return
path = incidents_path(cfg)
if not os.path.exists(path):
await msg.answer("⚠️ Incidents log not found", reply_markup=system_logs_audit_kb)
return
last_24h = read_recent(cfg, hours=24, limit=500)
last_7d = read_recent(cfg, hours=24 * 7, limit=1000)
recent = last_24h[-30:] if last_24h else ["(no incidents)"]
body = "\n".join(recent)
text = (
"📣 Incidents\n\n"
f"Last 24h: {len(last_24h)}\n"
f"Last 7d: {len(last_7d)}\n\n"
"Recent (24h):\n"
f"```\n{body}\n```"
)
await msg.answer(text, reply_markup=system_logs_audit_kb, parse_mode="Markdown")
@dp.message(F.text == "🔒 SSL")
async def ssl_certs(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking SSL certificates…", reply_markup=system_logs_security_kb)
async def worker():
try:
certs = await asyncio.to_thread(fetch_certificates, cfg)
text = format_certificates(certs)
except Exception as e:
text = f"⚠️ NPMplus error: {e}"
await msg.answer(text, reply_markup=system_logs_security_kb)
asyncio.create_task(worker())
@dp.message(F.text == "🍵 Gitea")
async def gitea_health(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Checking Gitea health…", reply_markup=system_logs_integrations_kb)
async def worker():
try:
text = await asyncio.to_thread(get_gitea_health, cfg)
except Exception as e:
text = f"⚠️ Gitea error: {e}"
await msg.answer(text, reply_markup=system_logs_integrations_kb)
asyncio.create_task(worker())
@dp.message(F.text == "🧩 NPMplus")
async def npmplus_hosts(msg: Message):
if not is_admin_msg(msg):
return
await msg.answer("⏳ Loading NPMplus hosts…", reply_markup=system_logs_integrations_kb)
async def worker():
try:
hosts = await asyncio.to_thread(list_proxy_hosts, cfg)
except Exception as e:
await msg.answer(f"⚠️ NPMplus error: {e}", reply_markup=system_logs_integrations_kb)
return
if not hosts:
await msg.answer("🧩 NPMplus\n\n(no proxy hosts)", reply_markup=system_logs_integrations_kb)
return
lines = ["🧩 NPMplus proxy hosts\n"]
rows = []
for h in hosts[:10]:
hid = h.get("id")
domains = h.get("domain_names") or []
name = ", ".join(domains) if isinstance(domains, list) else str(domains)
forward = f"{h.get('forward_host')}:{h.get('forward_port')}"
enabled = h.get("enabled", True)
icon = "🟢" if enabled else "🔴"
lines.append(f"{icon} {name}{forward}")
if hid is not None:
action = "disable" if enabled else "enable"
rows.append([
InlineKeyboardButton(
text=f"{('' if enabled else '')} {name[:12]}",
callback_data=f"npmplus:{action}:{hid}"
)
])
kb = InlineKeyboardMarkup(inline_keyboard=rows)
await msg.answer("\n".join(lines), reply_markup=kb)
asyncio.create_task(worker())
@dp.message(F.text == "📦 Updates")
async def updates_list(msg: Message):
if not is_admin_msg(msg):
return
async def job():
title, lines = await list_updates()
UPDATES_CACHE[msg.from_user.id] = {
"title": title,
"lines": lines,
"page_size": 20,
}
await send_updates_page(msg, msg.from_user.id, 0, edit=False)
pos = await enqueue("pkg-updates", job)
await msg.answer(f"🕓 Updates queued (#{pos})", reply_markup=system_ops_kb)
@dp.message(F.text == "⬆️ Upgrade")
async def updates_apply(msg: Message):
if not is_admin_msg(msg):
return
kb = InlineKeyboardMarkup(
inline_keyboard=[[
InlineKeyboardButton(text="✅ Confirm", callback_data="upgrade:confirm"),
InlineKeyboardButton(text="✖ Cancel", callback_data="upgrade:cancel"),
]]
)
await msg.answer("⚠️ Confirm package upgrade?", reply_markup=kb)
@dp.message(F.text == "🔄 Reboot")
async def reboot_request(msg: Message):
if not is_admin_msg(msg):
return
kb = InlineKeyboardMarkup(
inline_keyboard=[[
InlineKeyboardButton(text="✅ Confirm", callback_data="reboot:confirm"),
InlineKeyboardButton(text="✖ Cancel", callback_data="reboot:cancel"),
]]
)
await msg.answer("⚠️ Confirm reboot?", reply_markup=kb)
@dp.message(F.text == "🧱 Hardware")
async def hw(msg: Message):
if is_admin_msg(msg):
await msg.answer(hardware(), reply_markup=system_info_kb)
def _updates_kb(page: int, total_pages: int) -> InlineKeyboardMarkup:
buttons = []
if total_pages > 1:
row = []
if page > 0:
row.append(InlineKeyboardButton(text="⬅️ Prev", callback_data=f"updpage:{page-1}"))
if page < total_pages - 1:
row.append(InlineKeyboardButton(text="Next ➡️", callback_data=f"updpage:{page+1}"))
if row:
buttons.append(row)
return InlineKeyboardMarkup(inline_keyboard=buttons)
async def send_updates_page(msg: Message, user_id: int, page: int, edit: bool):
data = UPDATES_CACHE.get(user_id)
if not data:
await msg.answer("⚠️ Updates cache empty", reply_markup=system_ops_kb)
return
lines = data["lines"]
page_size = data["page_size"]
total_pages = max(1, (len(lines) + page_size - 1) // page_size)
page = max(0, min(page, total_pages - 1))
start = page * page_size
end = start + page_size
body = "\n".join(lines[start:end])
text = f"{data['title']} (page {page+1}/{total_pages})\n```\n{body}\n```"
if edit:
await msg.edit_text(text, reply_markup=_updates_kb(page, total_pages), parse_mode="Markdown")
else:
await msg.answer(text, reply_markup=_updates_kb(page, total_pages), parse_mode="Markdown")
@dp.callback_query(F.data.startswith("updpage:"))
async def updates_page(cb: CallbackQuery):
if cb.from_user.id not in UPDATES_CACHE:
await cb.answer("No cached updates")
return
try:
page = int(cb.data.split(":", 1)[1])
except ValueError:
await cb.answer("Bad page")
return
await cb.answer()
await send_updates_page(cb.message, cb.from_user.id, page, edit=True)
@dp.callback_query(F.data == "upgrade:confirm")
async def upgrade_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
return
await cb.answer()
async def job():
text = await apply_updates()
await cb.message.answer(text, reply_markup=system_ops_kb, parse_mode="Markdown")
pos = await enqueue("pkg-upgrade", job)
await cb.message.answer(f"🕓 Upgrade queued (#{pos})", reply_markup=system_ops_kb)
@dp.callback_query(F.data == "upgrade:cancel")
async def upgrade_cancel(cb: CallbackQuery):
await cb.answer("Cancelled")
await cb.message.delete()
@dp.callback_query(F.data == "reboot:confirm")
async def reboot_confirm(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
return
await cb.answer()
REBOOT_PENDING[cb.from_user.id] = {}
await cb.message.answer("🔐 Send reboot password", reply_markup=system_ops_kb)
@dp.callback_query(F.data == "reboot:cancel")
async def reboot_cancel(cb: CallbackQuery):
await cb.answer("Cancelled")
await cb.message.delete()
@dp.callback_query(F.data.startswith("npmplus:"))
async def npmplus_toggle(cb: CallbackQuery):
if cb.from_user.id != cfg["telegram"]["admin_id"]:
return
parts = cb.data.split(":")
if len(parts) != 3:
await cb.answer("Bad request")
return
action, raw_id = parts[1], parts[2]
try:
host_id = int(raw_id)
except ValueError:
await cb.answer("Bad id")
return
enable = action == "enable"
await cb.answer("Working…")
ok, info = await asyncio.to_thread(set_proxy_host, cfg, host_id, enable)
if ok:
await cb.message.answer("✅ Updated", reply_markup=system_logs_integrations_kb)
else:
await cb.message.answer(f"❌ NPMplus error: {info}", reply_markup=system_logs_integrations_kb)
@dp.message(F.text, F.func(lambda msg: msg.from_user and msg.from_user.id in REBOOT_PENDING))
async def reboot_password(msg: Message):
if not is_admin_msg(msg):
return
REBOOT_PENDING.pop(msg.from_user.id, None)
expected = cfg.get("security", {}).get("reboot_password")
if not expected:
await msg.answer("⚠️ Reboot password not configured", reply_markup=system_ops_kb)
return
if (msg.text or "").strip() != expected:
await msg.answer("❌ Wrong password", reply_markup=system_ops_kb)
return
async def job():
await msg.answer("🔄 Rebooting…", reply_markup=system_ops_kb)
await run_cmd(["sudo", "reboot"], timeout=10)
pos = await enqueue("reboot", job)
await msg.answer(f"🕓 Reboot queued (#{pos})", reply_markup=system_ops_kb)